This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new dba644d27 [flink] Basic $binlog read support without pushdown 
optimizations for pk table (#2525)
dba644d27 is described below

commit dba644d27aa6e9788327587da702e80a3c033ebc
Author: MehulBatra <[email protected]>
AuthorDate: Mon Feb 2 14:53:54 2026 +0530

    [flink] Basic $binlog read support without pushdown optimizations for pk 
table (#2525)
---
 .../org/apache/fluss/metadata/TableDescriptor.java |   4 +
 .../source/Flink118BinlogVirtualTableITCase.java   |  21 ++
 .../source/Flink119BinlogVirtualTableITCase.java   |  21 ++
 .../source/Flink120BinlogVirtualTableITCase.java   |  21 ++
 .../source/Flink22BinlogVirtualTableITCase.java    |  21 ++
 .../apache/fluss/flink/FlinkConnectorOptions.java  |   8 +-
 .../apache/fluss/flink/catalog/FlinkCatalog.java   | 114 ++++++-
 .../fluss/flink/catalog/FlinkTableFactory.java     |  53 +++
 ...ableSource.java => BinlogFlinkTableSource.java} | 123 ++-----
 .../flink/source/ChangelogFlinkTableSource.java    |  18 +-
 .../deserializer/BinlogDeserializationSchema.java  |  85 +++++
 .../flink/source/emitter/FlinkRecordEmitter.java   |  26 +-
 .../fluss/flink/utils/BinlogRowConverter.java      | 174 ++++++++++
 .../fluss/flink/utils/ChangelogRowConverter.java   |  17 +-
 .../fluss/flink/catalog/FlinkCatalogITCase.java    |  52 ++-
 .../flink/source/BinlogVirtualTableITCase.java     | 362 +++++++++++++++++++++
 .../flink/source/ChangelogVirtualTableITCase.java  |  49 +--
 .../fluss/flink/utils/BinlogRowConverterTest.java  | 249 ++++++++++++++
 .../flink/utils/ChangelogRowConverterTest.java     |  18 +-
 19 files changed, 1285 insertions(+), 151 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
index 4c8ecbd79..9f18dd401 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
@@ -65,6 +65,10 @@ public final class TableDescriptor implements Serializable {
     public static final String LOG_OFFSET_COLUMN = "_log_offset";
     public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp";
 
+    // column names for $binlog virtual table nested row fields
+    public static final String BEFORE_COLUMN = "before";
+    public static final String AFTER_COLUMN = "after";
+
     private final Schema schema;
     private final @Nullable String comment;
     private final List<String> partitionKeys;
diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..7dcd9e45a
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.18. */
+public class Flink118BinlogVirtualTableITCase extends BinlogVirtualTableITCase 
{}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..074a300f4
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.19. */
+public class Flink119BinlogVirtualTableITCase extends BinlogVirtualTableITCase 
{}
diff --git 
a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..354f8f198
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.20. */
+public class Flink120BinlogVirtualTableITCase extends BinlogVirtualTableITCase 
{}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..d73dad613
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 2.2. */
+public class Flink22BinlogVirtualTableITCase extends BinlogVirtualTableITCase 
{}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index c62292c69..6fd5d147f 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -245,7 +245,13 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "The serialized base64 bytes of refresh handler of 
materialized table.");
 
-    // 
------------------------------------------------------------------------------------------
+    /** Internal option to indicate whether the base table is partitioned for 
$binlog sources. */
+    public static final ConfigOption<Boolean> INTERNAL_BINLOG_IS_PARTITIONED =
+            ConfigOptions.key("_internal.binlog.is-partitioned")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Internal option: indicates whether the base table 
is partitioned for $binlog virtual tables. Not part of public API.");
 
     /** Startup mode for the fluss scanner, see {@link #SCAN_STARTUP_MODE}. */
     public enum ScanStartupMode implements DescribedEnum {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index c6c434889..5d81c5e5e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -23,6 +23,7 @@ import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.flink.adapter.CatalogTableAdapter;
 import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.procedure.ProcedureManager;
@@ -39,6 +40,7 @@ import org.apache.fluss.utils.ExceptionUtils;
 import org.apache.fluss.utils.IOUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -73,6 +75,7 @@ import 
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.types.AbstractDataType;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -315,11 +318,7 @@ public class FlinkCatalog extends AbstractCatalog {
             return getVirtualChangelogTable(objectPath);
         } else if (tableName.endsWith(BINLOG_TABLE_SUFFIX)
                 && !tableName.contains(LAKE_TABLE_SPLITTER)) {
-            // TODO: Implement binlog virtual table in future
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "$binlog virtual tables are not yet supported for 
table %s",
-                            objectPath));
+            return getVirtualBinlogTable(objectPath);
         }
 
         TablePath tablePath = toTablePath(objectPath);
@@ -960,4 +959,109 @@ public class FlinkCatalog extends AbstractCatalog {
 
         return builder.build();
     }
+
+    /**
+     * Creates a virtual $binlog table by modifying the base table's schema to 
include metadata
+     * columns and nested before/after ROW fields.
+     */
+    private CatalogBaseTable getVirtualBinlogTable(ObjectPath objectPath)
+            throws TableNotExistException, CatalogException {
+        // Extract the base table name (remove $binlog suffix)
+        String virtualTableName = objectPath.getObjectName();
+        String baseTableName =
+                virtualTableName.substring(
+                        0, virtualTableName.length() - 
BINLOG_TABLE_SUFFIX.length());
+
+        // Get the base table
+        ObjectPath baseObjectPath = new 
ObjectPath(objectPath.getDatabaseName(), baseTableName);
+        TablePath baseTablePath = toTablePath(baseObjectPath);
+
+        try {
+            // Retrieve base table info
+            TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();
+
+            // $binlog is only supported for primary key tables
+            if (!tableInfo.hasPrimaryKey()) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "$binlog virtual tables are only supported for 
primary key tables. "
+                                        + "Table %s does not have a primary 
key.",
+                                baseTablePath));
+            }
+
+            // Convert to Flink table
+            CatalogBaseTable catalogBaseTable = 
FlinkConversions.toFlinkTable(tableInfo);
+
+            if (!(catalogBaseTable instanceof CatalogTable)) {
+                throw new UnsupportedOperationException(
+                        "Virtual $binlog tables are only supported for regular 
tables");
+            }
+
+            CatalogTable baseTable = (CatalogTable) catalogBaseTable;
+
+            // Build the binlog schema with nested before/after ROW columns
+            Schema originalSchema = baseTable.getUnresolvedSchema();
+            Schema binlogSchema = buildBinlogSchema(originalSchema);
+
+            // Copy options from base table
+            Map<String, String> newOptions = new 
HashMap<>(baseTable.getOptions());
+            newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
+            newOptions.putAll(securityConfigs);
+
+            // Store whether the base table is partitioned for the table 
source to use.
+            // Since binlog schema has nested columns, we can't use Flink's 
partition key mechanism.
+            newOptions.put(
+                    FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED.key(),
+                    String.valueOf(!baseTable.getPartitionKeys().isEmpty()));
+
+            // Create a new CatalogTable with the binlog schema
+            // Binlog virtual tables don't have partition keys at the top level
+            return CatalogTableAdapter.toCatalogTable(
+                    binlogSchema, baseTable.getComment(), 
Collections.emptyList(), newOptions);
+
+        } catch (Exception e) {
+            Throwable t = ExceptionUtils.stripExecutionException(e);
+            if (t instanceof UnsupportedOperationException) {
+                throw (UnsupportedOperationException) t;
+            }
+            if (isTableNotExist(t)) {
+                throw new TableNotExistException(getName(), baseObjectPath);
+            } else {
+                throw new CatalogException(
+                        String.format(
+                                "Failed to get virtual binlog table %s in %s",
+                                objectPath, getName()),
+                        t);
+            }
+        }
+    }
+
+    private Schema buildBinlogSchema(Schema originalSchema) {
+        Schema.Builder builder = Schema.newBuilder();
+
+        // Add metadata columns
+        builder.column("_change_type", STRING().notNull());
+        builder.column("_log_offset", BIGINT().notNull());
+        builder.column("_commit_timestamp", TIMESTAMP_LTZ(3).notNull());
+
+        // Build nested ROW type from original columns for before/after fields
+        // Using UnresolvedField since physCol.getDataType() returns 
AbstractDataType (unresolved)
+        List<DataTypes.UnresolvedField> rowFields = new ArrayList<>();
+        for (Schema.UnresolvedColumn col : originalSchema.getColumns()) {
+            if (col instanceof Schema.UnresolvedPhysicalColumn) {
+                Schema.UnresolvedPhysicalColumn physCol = 
(Schema.UnresolvedPhysicalColumn) col;
+                rowFields.add(DataTypes.FIELD(physCol.getName(), 
physCol.getDataType()));
+            }
+        }
+        AbstractDataType<?> nestedRowType =
+                DataTypes.ROW(rowFields.toArray(new 
DataTypes.UnresolvedField[0]));
+
+        // Add before and after as nullable nested ROW columns
+        builder.column("before", nestedRowType);
+        builder.column("after", nestedRowType);
+
+        // Note: We don't copy primary keys or watermarks for virtual tables
+
+        return builder.build();
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 7c99b6f4b..a7a5608de 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -24,6 +24,7 @@ import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.lake.LakeTableFactory;
 import org.apache.fluss.flink.sink.FlinkTableSink;
 import org.apache.fluss.flink.sink.shuffle.DistributionMode;
+import org.apache.fluss.flink.source.BinlogFlinkTableSource;
 import org.apache.fluss.flink.source.ChangelogFlinkTableSource;
 import org.apache.fluss.flink.source.FlinkTableSource;
 import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
@@ -93,6 +94,11 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
             return createChangelogTableSource(context, tableIdentifier, 
tableName);
         }
 
+        // Check if this is a $binlog suffix in table name
+        if (tableName.endsWith(FlinkCatalog.BINLOG_TABLE_SUFFIX)) {
+            return createBinlogTableSource(context, tableIdentifier, 
tableName);
+        }
+
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         final ReadableConfig tableOptions = helper.getOptions();
         validateSourceOptions(tableOptions);
@@ -327,4 +333,51 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
                 partitionDiscoveryIntervalMs,
                 catalogTableOptions);
     }
+
+    /** Creates a BinlogFlinkTableSource for $binlog virtual tables. */
+    private DynamicTableSource createBinlogTableSource(
+            Context context, ObjectIdentifier tableIdentifier, String 
tableName) {
+        // Extract the base table name by removing the $binlog suffix
+        String baseTableName =
+                tableName.substring(
+                        0, tableName.length() - 
FlinkCatalog.BINLOG_TABLE_SUFFIX.length());
+
+        boolean isStreamingMode =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+
+        // tableOutputType: [_change_type, _log_offset, _commit_timestamp, 
before ROW<...>, after
+        // ROW<...>]
+        RowType tableOutputType = (RowType) 
context.getPhysicalRowDataType().getLogicalType();
+
+        Map<String, String> catalogTableOptions = 
context.getCatalogTable().getOptions();
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig tableOptions = helper.getOptions();
+        validateSourceOptions(tableOptions);
+
+        ZoneId timeZone =
+                FlinkConnectorOptionsUtils.getLocalTimeZone(
+                        
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE));
+        final FlinkConnectorOptionsUtils.StartupOptions startupOptions =
+                FlinkConnectorOptionsUtils.getStartupOptions(tableOptions, 
timeZone);
+
+        // Check if the table is partitioned from the internal option
+        boolean isPartitioned =
+                
tableOptions.get(FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED);
+
+        long partitionDiscoveryIntervalMs =
+                tableOptions
+                        
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
+                        .toMillis();
+
+        return new BinlogFlinkTableSource(
+                TablePath.of(tableIdentifier.getDatabaseName(), baseTableName),
+                toFlussClientConfig(catalogTableOptions, 
context.getConfiguration()),
+                tableOutputType,
+                isPartitioned,
+                isStreamingMode,
+                startupOptions,
+                partitionDiscoveryIntervalMs,
+                catalogTableOptions);
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
similarity index 52%
copy from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
copy to 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
index 3883c4308..a5ec91bf3 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
@@ -19,15 +19,13 @@ package org.apache.fluss.flink.source;
 
 import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.Configuration;
-import 
org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema;
+import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
 import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
-import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.types.RowType;
 
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -37,23 +35,18 @@ import org.apache.flink.table.types.logical.LogicalType;
 
 import javax.annotation.Nullable;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 
-/** A Flink table source for the $changelog virtual table. */
-public class ChangelogFlinkTableSource implements ScanTableSource {
+/** A Flink table source for the $binlog virtual table. */
+public class BinlogFlinkTableSource implements ScanTableSource {
 
     private final TablePath tablePath;
     private final Configuration flussConfig;
-    // The changelog output type (includes metadata columns: _change_type, 
_log_offset,
-    // _commit_timestamp)
-    private final org.apache.flink.table.types.logical.RowType 
changelogOutputType;
+    // The binlog output type (includes metadata + nested before/after ROW 
columns)
+    private final org.apache.flink.table.types.logical.RowType 
binlogOutputType;
+    // The data columns type extracted from the 'before' nested ROW
     private final org.apache.flink.table.types.logical.RowType dataColumnsType;
-    private final int[] partitionKeyIndexes;
+    private final boolean isPartitioned;
     private final boolean streaming;
     private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
     private final long scanPartitionDiscoveryIntervalMs;
@@ -65,77 +58,50 @@ public class ChangelogFlinkTableSource implements 
ScanTableSource {
 
     @Nullable private Predicate partitionFilters;
 
-    private static final Set<String> METADATA_COLUMN_NAMES =
-            new HashSet<>(
-                    Arrays.asList(
-                            TableDescriptor.CHANGE_TYPE_COLUMN,
-                            TableDescriptor.LOG_OFFSET_COLUMN,
-                            TableDescriptor.COMMIT_TIMESTAMP_COLUMN));
-
-    public ChangelogFlinkTableSource(
+    public BinlogFlinkTableSource(
             TablePath tablePath,
             Configuration flussConfig,
-            org.apache.flink.table.types.logical.RowType changelogOutputType,
-            int[] partitionKeyIndexes,
+            org.apache.flink.table.types.logical.RowType binlogOutputType,
+            boolean isPartitioned,
             boolean streaming,
             FlinkConnectorOptionsUtils.StartupOptions startupOptions,
             long scanPartitionDiscoveryIntervalMs,
             Map<String, String> tableOptions) {
         this.tablePath = tablePath;
         this.flussConfig = flussConfig;
-        // The changelogOutputType already includes metadata columns from 
FlinkCatalog
-        this.changelogOutputType = changelogOutputType;
-        this.partitionKeyIndexes = partitionKeyIndexes;
+        this.binlogOutputType = binlogOutputType;
+        this.isPartitioned = isPartitioned;
         this.streaming = streaming;
         this.startupOptions = startupOptions;
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
         this.tableOptions = tableOptions;
 
-        // Extract data columns by filtering out metadata columns by name
-        this.dataColumnsType = extractDataColumnsType(changelogOutputType);
-        this.producedDataType = changelogOutputType;
-    }
-
-    /**
-     * Extracts the data columns type by removing the metadata columns from 
the changelog output
-     * type.
-     */
-    private org.apache.flink.table.types.logical.RowType 
extractDataColumnsType(
-            org.apache.flink.table.types.logical.RowType changelogType) {
-        // Filter out metadata columns by name
-        List<org.apache.flink.table.types.logical.RowType.RowField> dataFields 
=
-                changelogType.getFields().stream()
-                        .filter(field -> 
!METADATA_COLUMN_NAMES.contains(field.getName()))
-                        .collect(Collectors.toList());
-
-        return new org.apache.flink.table.types.logical.RowType(dataFields);
+        // Extract data columns from the 'before' nested ROW type (index 3)
+        // The binlog schema is: [_change_type, _log_offset, 
_commit_timestamp, before, after]
+        this.dataColumnsType =
+                (org.apache.flink.table.types.logical.RowType) 
binlogOutputType.getTypeAt(3);
+        this.producedDataType = binlogOutputType;
     }
 
     @Override
     public ChangelogMode getChangelogMode() {
-        // The $changelog virtual table always produces INSERT-only records.
-        // All change types (+I, -U, +U, -D) are flattened into regular rows
         return ChangelogMode.insertOnly();
     }
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
-        // Create the Fluss row type for the data columns (without metadata)
+        // Create the Fluss row type for the data columns (the original table 
columns)
         RowType flussRowType = 
FlinkConversions.toFlussRowType(dataColumnsType);
         if (projectedFields != null) {
-            // Adjust projection to account for metadata columns
-            // TODO: Handle projection properly with metadata columns
             flussRowType = flussRowType.project(projectedFields);
         }
-        // to capture all change types (+I, -U, +U, -D).
-        // FULL mode reads snapshot first (no change types), so we use 
EARLIEST for log-only
-        // reading.
-        // LATEST mode is supported for real-time changelog streaming from 
current position.
+
+        // Determine the offsets initializer based on startup mode
         OffsetsInitializer offsetsInitializer;
         switch (startupOptions.startupMode) {
             case EARLIEST:
             case FULL:
-                // For changelog, FULL mode should read all log records from 
beginning
+                // For binlog, read all log records from the beginning
                 offsetsInitializer = OffsetsInitializer.earliest();
                 break;
             case LATEST:
@@ -150,52 +116,33 @@ public class ChangelogFlinkTableSource implements 
ScanTableSource {
                         "Unsupported startup mode: " + 
startupOptions.startupMode);
         }
 
-        // Create the source with the changelog deserialization schema
+        // Create the source with the binlog deserialization schema
         FlinkSource<RowData> source =
                 new FlinkSource<>(
                         flussConfig,
                         tablePath,
-                        // Changelog/binlog virtual tables are purely 
log-based and don't have a
-                        // primary key, setting hasPrimaryKey "false" to 
ensure the enumerator
-                        // fetches log-only splits (not snapshot splits), 
which is the correct
-                        // behavior for virtual tables.
                         false,
-                        isPartitioned(),
+                        isPartitioned,
                         flussRowType,
                         projectedFields,
                         offsetsInitializer,
                         scanPartitionDiscoveryIntervalMs,
-                        new ChangelogDeserializationSchema(),
+                        new BinlogDeserializationSchema(),
                         streaming,
                         partitionFilters,
-                        null); // Lake source not supported
-
-        if (!streaming) {
-            // Batch mode - changelog virtual tables read from log, not data 
lake
-            return new SourceProvider() {
-                @Override
-                public boolean isBounded() {
-                    return true;
-                }
-
-                @Override
-                public Source<RowData, ?, ?> createSource() {
-                    return source;
-                }
-            };
-        } else {
-            return SourceProvider.of(source);
-        }
+                        null);
+
+        return SourceProvider.of(source);
     }
 
     @Override
     public DynamicTableSource copy() {
-        ChangelogFlinkTableSource copy =
-                new ChangelogFlinkTableSource(
+        BinlogFlinkTableSource copy =
+                new BinlogFlinkTableSource(
                         tablePath,
                         flussConfig,
-                        changelogOutputType,
-                        partitionKeyIndexes,
+                        binlogOutputType,
+                        isPartitioned,
                         streaming,
                         startupOptions,
                         scanPartitionDiscoveryIntervalMs,
@@ -208,13 +155,9 @@ public class ChangelogFlinkTableSource implements 
ScanTableSource {
 
     @Override
     public String asSummaryString() {
-        return "FlussChangelogTableSource";
+        return "FlussBinlogTableSource";
     }
 
-    // TODO: Implement projection pushdown handling for metadata columns
+    // TODO: Implement projection pushdown handling for nested before/after 
columns
     // TODO: Implement filter pushdown
-
-    private boolean isPartitioned() {
-        return partitionKeyIndexes.length > 0;
-    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
index 3883c4308..a034807d7 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
@@ -27,7 +27,6 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.types.RowType;
 
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -170,22 +169,7 @@ public class ChangelogFlinkTableSource implements 
ScanTableSource {
                         partitionFilters,
                         null); // Lake source not supported
 
-        if (!streaming) {
-            // Batch mode - changelog virtual tables read from log, not data 
lake
-            return new SourceProvider() {
-                @Override
-                public boolean isBounded() {
-                    return true;
-                }
-
-                @Override
-                public Source<RowData, ?, ?> createSource() {
-                    return source;
-                }
-            };
-        } else {
-            return SourceProvider.of(source);
-        }
+        return SourceProvider.of(source);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java
new file mode 100644
index 000000000..1114febb9
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.deserializer;
+
+import org.apache.fluss.flink.utils.BinlogRowConverter;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import javax.annotation.Nullable;
+
+import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
+
+/**
+ * A deserialization schema that converts {@link LogRecord} objects to Flink's 
{@link RowData}
+ * format with nested before/after row structure for the $binlog virtual table.
+ *
+ * <p>This schema is stateful: it buffers UPDATE_BEFORE (-U) records and 
returns {@code null} for
+ * them. When the subsequent UPDATE_AFTER (+U) record arrives, it merges both 
into a single binlog
+ * row. The {@link org.apache.fluss.flink.source.emitter.FlinkRecordEmitter} 
handles null returns by
+ * skipping emission.
+ */
+public class BinlogDeserializationSchema implements 
FlussDeserializationSchema<RowData> {
+
+    /**
+     * Converter responsible for transforming Fluss row data into Flink's 
{@link RowData} format
+     * with nested before/after structure. Initialized during {@link 
#open(InitializationContext)}.
+     */
+    private transient BinlogRowConverter converter;
+
+    /** Creates a new BinlogDeserializationSchema. */
+    public BinlogDeserializationSchema() {}
+
+    /** Initializes the deserialization schema. */
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        if (converter == null) {
+            this.converter = new BinlogRowConverter(context.getRowSchema());
+        }
+    }
+
+    /**
+     * Deserializes a {@link LogRecord} into a Flink {@link RowData} object 
with nested before/after
+     * structure.
+     */
+    @Override
+    @Nullable
+    public RowData deserialize(LogRecord record) throws Exception {
+        if (converter == null) {
+            throw new IllegalStateException(
+                    "Converter not initialized. The open() method must be 
called before deserializing records.");
+        }
+        return converter.toBinlogRowData(record);
+    }
+
+    /**
+     * Returns the TypeInformation for the produced {@link RowData} type 
including nested
+     * before/after ROW columns.
+     */
+    @Override
+    public TypeInformation<RowData> getProducedType(RowType rowSchema) {
+        // Build the output type with nested before/after ROW columns
+        org.apache.flink.table.types.logical.RowType outputType =
+                
BinlogRowConverter.buildBinlogRowType(toFlinkRowType(rowSchema));
+        return InternalTypeInfo.of(outputType);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
index 82a3ed87e..ba4c9d013 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
@@ -73,8 +73,20 @@ public class FlinkRecordEmitter<OUT> implements 
RecordEmitter<RecordAndPos, OUT,
             }
             processAndEmitRecord(scanRecord, sourceOutput);
         } else if (splitState.isLogSplitState()) {
-            
splitState.asLogSplitState().setNextOffset(recordAndPosition.record().logOffset()
 + 1);
-            processAndEmitRecord(recordAndPosition.record(), sourceOutput);
+            // Attempt to process and emit the record.
+            // For $binlog, this returns true only when a complete row (or the 
final part of
+            // a split) is emitted.
+            boolean emitted = processAndEmitRecord(recordAndPosition.record(), 
sourceOutput);
+
+            if (emitted) {
+                // Only advance the offset in state if the record was 
successfully emitted.
+                // This ensures that if a crash occurs mid-update (between 
BEFORE and AFTER),
+                // the source will re-read the same log offset upon recovery,
+                // allowing the BinlogDeserializationSchema to correctly 
reconstruct the state.
+                splitState
+                        .asLogSplitState()
+                        .setNextOffset(recordAndPosition.record().logOffset() 
+ 1);
+            }
         } else if (splitState.isLakeSplit()) {
             if (lakeRecordRecordEmitter == null) {
                 lakeRecordRecordEmitter = new 
LakeRecordRecordEmitter<>(this::processAndEmitRecord);
@@ -85,7 +97,13 @@ public class FlinkRecordEmitter<OUT> implements 
RecordEmitter<RecordAndPos, OUT,
         }
     }
 
-    private void processAndEmitRecord(ScanRecord scanRecord, SourceOutput<OUT> 
sourceOutput) {
+    /**
+     * Processes and emits a record.
+     *
+     * @return true if a record was emitted, false if deserialize returned 
null (e.g., for $binlog
+     *     UPDATE_BEFORE records that are buffered pending their UPDATE_AFTER 
pair)
+     */
+    private boolean processAndEmitRecord(ScanRecord scanRecord, 
SourceOutput<OUT> sourceOutput) {
         OUT record;
         try {
             record = deserializationSchema.deserialize(scanRecord);
@@ -102,6 +120,8 @@ public class FlinkRecordEmitter<OUT> implements 
RecordEmitter<RecordAndPos, OUT,
             } else {
                 sourceOutput.collect(record);
             }
+            return true;
         }
+        return false;
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java
new file mode 100644
index 000000000..3a9f2c941
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A converter that transforms Fluss's {@link LogRecord} to Flink's {@link 
RowData} with nested
+ * before/after row structure for the $binlog virtual table.
+ */
+public class BinlogRowConverter implements RecordToFlinkRowConverter {
+
+    private final FlussRowToFlinkRowConverter baseConverter;
+    private final org.apache.flink.table.types.logical.RowType producedType;
+
+    /**
+     * Buffer for the UPDATE_BEFORE (-U) record pending merge with the next 
UPDATE_AFTER (+U)
+     * record. Null when no update is in progress.
+     */
+    @Nullable private LogRecord pendingUpdateBefore;
+
+    /** Creates a new BinlogRowConverter. */
+    public BinlogRowConverter(RowType rowType) {
+        this.baseConverter = new FlussRowToFlinkRowConverter(rowType);
+        this.producedType = 
buildBinlogRowType(FlinkConversions.toFlinkRowType(rowType));
+    }
+
+    /** Converts a LogRecord to a binlog RowData with nested before/after 
structure. */
+    @Nullable
+    public RowData toBinlogRowData(LogRecord record) {
+        ChangeType changeType = record.getChangeType();
+
+        switch (changeType) {
+            case INSERT:
+                return buildBinlogRow(
+                        "insert",
+                        record.logOffset(),
+                        record.timestamp(),
+                        null,
+                        baseConverter.toFlinkRowData(record.getRow()));
+
+            case UPDATE_BEFORE:
+                // Buffer the -U record and return null.
+                // FlinkRecordEmitter.processAndEmitRecord() skips null 
results.
+                this.pendingUpdateBefore = record;
+                return null;
+
+            case UPDATE_AFTER:
+                // Merge with the buffered -U record
+                if (pendingUpdateBefore == null) {
+                    throw new IllegalStateException(
+                            "Received UPDATE_AFTER (+U) without a preceding 
UPDATE_BEFORE (-U) record. "
+                                    + "This indicates a corrupted log 
sequence.");
+                }
+                RowData beforeRow = 
baseConverter.toFlinkRowData(pendingUpdateBefore.getRow());
+                RowData afterRow = 
baseConverter.toFlinkRowData(record.getRow());
+                // Use offset and timestamp from the -U record (first entry of 
update pair)
+                long offset = pendingUpdateBefore.logOffset();
+                long timestamp = pendingUpdateBefore.timestamp();
+                pendingUpdateBefore = null;
+                return buildBinlogRow("update", offset, timestamp, beforeRow, 
afterRow);
+
+            case DELETE:
+                return buildBinlogRow(
+                        "delete",
+                        record.logOffset(),
+                        record.timestamp(),
+                        baseConverter.toFlinkRowData(record.getRow()),
+                        null);
+
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "$binlog virtual table does not support change 
type: %s. "
+                                        + "$binlog is only supported for 
primary key tables.",
+                                changeType));
+        }
+    }
+
+    @Override
+    @Nullable
+    public RowData convert(LogRecord record) {
+        return toBinlogRowData(record);
+    }
+
+    @Override
+    public org.apache.flink.table.types.logical.RowType getProducedType() {
+        return producedType;
+    }
+
+    /**
+     * Builds a binlog row with 5 fields: _change_type, _log_offset, 
_commit_timestamp, before,
+     * after.
+     */
+    private RowData buildBinlogRow(
+            String changeType,
+            long offset,
+            long timestamp,
+            @Nullable RowData before,
+            @Nullable RowData after) {
+        GenericRowData row = new GenericRowData(5);
+        row.setField(0, StringData.fromString(changeType));
+        row.setField(1, offset);
+        row.setField(2, TimestampData.fromEpochMillis(timestamp));
+        row.setField(3, before);
+        row.setField(4, after);
+        row.setRowKind(RowKind.INSERT);
+        return row;
+    }
+
+    /**
+     * Builds the Flink RowType for the binlog virtual table with nested 
before/after ROW columns.
+     */
+    public static org.apache.flink.table.types.logical.RowType 
buildBinlogRowType(
+            org.apache.flink.table.types.logical.RowType originalType) {
+        List<org.apache.flink.table.types.logical.RowType.RowField> fields = 
new ArrayList<>();
+
+        // Add metadata columns
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.CHANGE_TYPE_COLUMN, new 
VarCharType(false, 6)));
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.LOG_OFFSET_COLUMN, new 
BigIntType(false)));
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.COMMIT_TIMESTAMP_COLUMN,
+                        new LocalZonedTimestampType(false, 3)));
+
+        // Add nested before and after ROW columns (nullable at the ROW level)
+        org.apache.flink.table.types.logical.RowType nullableRowType =
+                new org.apache.flink.table.types.logical.RowType(true, 
originalType.getFields());
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.BEFORE_COLUMN, nullableRowType));
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.AFTER_COLUMN, nullableRowType));
+
+        return new org.apache.flink.table.types.logical.RowType(fields);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
index 6b8081a6d..a2b339da5 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
@@ -86,8 +86,19 @@ public class ChangelogRowConverter implements 
RecordToFlinkRowConverter {
 
     /** Converts a Fluss ChangeType to its string representation for the 
changelog virtual table. */
     private String convertChangeTypeToString(ChangeType changeType) {
-        // Use the short string representation from ChangeType
-        return changeType.shortString();
+        switch (changeType) {
+            case APPEND_ONLY:
+            case INSERT:
+                return "insert";
+            case UPDATE_BEFORE:
+                return "update_before";
+            case UPDATE_AFTER:
+                return "update_after";
+            case DELETE:
+                return "delete";
+            default:
+                throw new IllegalArgumentException("Unknown change type: " + 
changeType);
+        }
     }
 
     /**
@@ -103,7 +114,7 @@ public class ChangelogRowConverter implements 
RecordToFlinkRowConverter {
         // Add metadata columns first (using centralized constants from 
TableDescriptor)
         fields.add(
                 new org.apache.flink.table.types.logical.RowType.RowField(
-                        TableDescriptor.CHANGE_TYPE_COLUMN, new 
VarCharType(false, 2)));
+                        TableDescriptor.CHANGE_TYPE_COLUMN, new 
VarCharType(false, 13)));
         fields.add(
                 new org.apache.flink.table.types.logical.RowType.RowField(
                         TableDescriptor.LOG_OFFSET_COLUMN, new 
BigIntType(false)));
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 58e78ccd2..f4ffd61de 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -27,6 +27,7 @@ import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -914,7 +915,7 @@ abstract class FlinkCatalogITCase {
         // Verify options are inherited from base table
         assertThat(changelogTable.getOptions()).containsEntry("bucket.num", 
"1");
 
-        // Verify $changelog log tables (append-only with +A change type)
+        // Verify $changelog log tables (append-only with insert change type)
         tEnv.executeSql("CREATE TABLE log_table_for_changelog (id INT, name 
STRING)");
 
         CatalogTable logChangelogTable =
@@ -935,6 +936,55 @@ abstract class FlinkCatalogITCase {
         
assertThat(logChangelogTable.getUnresolvedSchema()).isEqualTo(expectedLogSchema);
     }
 
+    @Test
+    void testGetBinlogVirtualTable() throws Exception {
+        // Create a primary key table with partition
+        tEnv.executeSql(
+                "CREATE TABLE pk_table_for_binlog ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING NOT NULL,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id, name) NOT ENFORCED"
+                        + ") PARTITIONED BY (name) "
+                        + "WITH ('bucket.num' = '1')");
+
+        // Get the $binlog virtual table via catalog API
+        CatalogTable binlogTable =
+                (CatalogTable)
+                        catalog.getTable(new ObjectPath(DEFAULT_DB, 
"pk_table_for_binlog$binlog"));
+
+        // use string representation for assertion to simplify the unresolved 
schema comparison
+        assertThat(binlogTable.getUnresolvedSchema().toString())
+                .isEqualTo(
+                        "(\n"
+                                + "  `_change_type` STRING NOT NULL,\n"
+                                + "  `_log_offset` BIGINT NOT NULL,\n"
+                                + "  `_commit_timestamp` TIMESTAMP_LTZ(3) NOT 
NULL,\n"
+                                + "  `before` [ROW<id INT NOT NULL, name 
STRING NOT NULL, amount BIGINT>],\n"
+                                + "  `after` [ROW<id INT NOT NULL, name STRING 
NOT NULL, amount BIGINT>]\n"
+                                + ")");
+
+        // Binlog virtual tables have empty partition keys (columns are nested)
+        assertThat(binlogTable.getPartitionKeys()).isEmpty();
+
+        // Partition info is stored as an internal boolean flag
+        assertThat(binlogTable.getOptions())
+                
.containsEntry(FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED.key(), 
"true");
+
+        // Verify options are inherited from base table
+        assertThat(binlogTable.getOptions()).containsEntry("bucket.num", "1");
+
+        // Verify $binlog is NOT supported for log tables (no primary key)
+        tEnv.executeSql("CREATE TABLE log_table_for_binlog (id INT, name 
STRING)");
+
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(DEFAULT_DB, 
"log_table_for_binlog$binlog")))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("only supported for primary key tables");
+    }
+
     /**
      * Before Flink 2.1, the {@link Schema} did not include an index field. 
Starting from Flink 2.1,
      * Flink introduced the concept of an index, and in Fluss, the primary key 
is considered as an
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..2fa306f88
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration test for $binlog virtual table functionality. */
+abstract class BinlogVirtualTableITCase extends AbstractTestBase {
+
+    protected static final ManualClock CLOCK = new ManualClock();
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(new Configuration())
+                    .setNumOfTabletServers(1)
+                    .setClock(CLOCK)
+                    .build();
+
+    static final String CATALOG_NAME = "testcatalog";
+    static final String DEFAULT_DB = "test_binlog_db";
+    protected StreamExecutionEnvironment execEnv;
+    protected StreamTableEnvironment tEnv;
+    protected static Connection conn;
+    protected static Admin admin;
+
+    protected static Configuration clientConf;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    void before() {
+        // Initialize Flink environment
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+
+        // Initialize catalog and database
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.executeSql("use catalog " + CATALOG_NAME);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.useDatabase(DEFAULT_DB);
+        // reset clock before each test
+        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    /** Deletes rows from a primary key table using the proper delete API. */
+    protected static void deleteRows(
+            Connection connection, TablePath tablePath, List<InternalRow> 
rows) throws Exception {
+        try (Table table = connection.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            for (InternalRow row : rows) {
+                writer.delete(row);
+            }
+            writer.flush();
+        }
+    }
+
+    @Test
+    public void testDescribeBinlogTable() throws Exception {
+        // Create a table with various data types to test complex schema
+        tEnv.executeSql(
+                "CREATE TABLE describe_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        // Test DESCRIBE on binlog virtual table
+        CloseableIterator<Row> describeResult =
+                tEnv.executeSql("DESCRIBE describe_test$binlog").collect();
+
+        List<String> schemaRows = new ArrayList<>();
+        while (describeResult.hasNext()) {
+            schemaRows.add(describeResult.next().toString());
+        }
+
+        // Should have 5 columns: _change_type, _log_offset, 
_commit_timestamp, before, after
+        assertThat(schemaRows).hasSize(5);
+
+        // Verify metadata columns are listed first
+        assertThat(schemaRows.get(0))
+                .isEqualTo("+I[_change_type, STRING, false, null, null, 
null]");
+        assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, 
false, null, null, null]");
+        assertThat(schemaRows.get(2))
+                .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false, 
null, null, null]");
+
+        // Verify before and after are ROW types with original columns
+        assertThat(schemaRows.get(3))
+                .isEqualTo(
+                        "+I[before, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+        assertThat(schemaRows.get(4))
+                .isEqualTo(
+                        "+I[after, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+    }
+
+    @Test
+    public void testBinlogUnsupportedForLogTable() throws Exception {
+        // Create a log table (no primary key)
+        tEnv.executeSql(
+                "CREATE TABLE log_table ("
+                        + "  event_id INT,"
+                        + "  event_type STRING"
+                        + ") WITH ('bucket.num' = '1')");
+
+        // $binlog should fail for log tables
+        assertThatThrownBy(() -> tEnv.executeSql("DESCRIBE 
log_table$binlog").collect())
+                .hasMessageContaining("only supported for primary key tables");
+    }
+
+    @Test
+    public void testBinlogWithAllChangeTypes() throws Exception {
+        // Create a primary key table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "binlog_test");
+
+        // Start binlog scan
+        String query =
+                "SELECT _change_type, _log_offset, "
+                        + "before.id, before.name, before.amount, "
+                        + "after.id, after.name, after.amount "
+                        + "FROM binlog_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Test INSERT
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(
+                conn,
+                tablePath,
+                Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)),
+                false);
+
+        // Collect inserts - each INSERT produces one binlog row
+        List<String> insertResults = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(insertResults).hasSize(2);
+
+        // INSERT: before=null, after=row data
+        // Format: +I[_change_type, _log_offset, before.id, before.name, 
before.amount,
+        //            after.id, after.name, after.amount]
+        assertThat(insertResults.get(0))
+                .isEqualTo("+I[insert, 0, null, null, null, 1, Item-1, 100]");
+        assertThat(insertResults.get(1))
+                .isEqualTo("+I[insert, 1, null, null, null, 2, Item-2, 200]");
+
+        // Test UPDATE - should merge -U and +U into single binlog row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 
150L)), false);
+
+        // UPDATE produces ONE binlog row (not two like changelog)
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 1, false);
+        assertThat(updateResults).hasSize(1);
+
+        // UPDATE: before=old row, after=new row, offset=from -U record
+        assertThat(updateResults.get(0))
+                .isEqualTo("+I[update, 2, 1, Item-1, 100, 1, Item-1-Updated, 
150]");
+
+        // Test DELETE
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L)));
+
+        // DELETE produces one binlog row
+        List<String> deleteResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(deleteResults).hasSize(1);
+
+        // DELETE: before=row data, after=null
+        assertThat(deleteResults.get(0))
+                .isEqualTo("+I[delete, 4, 2, Item-2, 200, null, null, null]");
+    }
+
+    @Test
+    public void testBinlogSelectStar() throws Exception {
+        // Test SELECT * which returns the full binlog structure
+        tEnv.executeSql(
+                "CREATE TABLE star_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "star_test");
+
+        String query = "SELECT * FROM star_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Insert a row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Alice")), false);
+
+        List<String> results = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(results).hasSize(1);
+
+        // SELECT * returns: _change_type, _log_offset, _commit_timestamp, 
before, after
+        // before is null for INSERT, after contains the row
+        assertThat(results.get(0))
+                .isEqualTo("+I[insert, 0, 1970-01-01T00:00:01Z, null, +I[1, 
Alice]]");
+    }
+
+    @Test
+    public void testBinlogWithPartitionedTable() throws Exception {
+        // Create a partitioned primary key table
+        tEnv.executeSql(
+                "CREATE TABLE partitioned_binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  region STRING NOT NULL,"
+                        + "  PRIMARY KEY (id, region) NOT ENFORCED"
+                        + ") PARTITIONED BY (region) WITH ('bucket.num' = 
'1')");
+
+        // Insert data into different partitions using Flink SQL
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql(
+                        "INSERT INTO partitioned_binlog_test VALUES "
+                                + "(1, 'Item-1', 'us'), "
+                                + "(2, 'Item-2', 'eu')")
+                .await();
+
+        // Query binlog with nested field access
+        String query =
+                "SELECT _change_type, after.id, after.name, after.region "
+                        + "FROM partitioned_binlog_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        List<String> results = collectRowsWithTimeout(rowIter, 2, false);
+        // Sort results for deterministic assertion (partitions may return in 
any order)
+        Collections.sort(results);
+        assertThat(results)
+                .isEqualTo(Arrays.asList("+I[insert, 1, Item-1, us]", 
"+I[insert, 2, Item-2, eu]"));
+
+        // Update a record in a specific partition
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql("INSERT INTO partitioned_binlog_test VALUES (1, 
'Item-1-Updated', 'us')")
+                .await();
+
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(updateResults).hasSize(1);
+        assertThat(updateResults.get(0)).isEqualTo("+I[update, 1, 
Item-1-Updated, us]");
+    }
+
+    @Test
+    public void testBinlogScanStartupMode() throws Exception {
+        // Create a primary key table with 1 bucket
+        tEnv.executeSql(
+                "CREATE TABLE startup_binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "startup_binlog_test");
+
+        // Write first batch
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3")), false);
+
+        // Write second batch
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(4, "v4"), row(5, "v5")), 
false);
+
+        // Test scan.startup.mode='earliest' - should read all records from 
beginning
+        String optionsEarliest = " /*+ OPTIONS('scan.startup.mode' = 
'earliest') */";
+        String queryEarliest =
+                "SELECT _change_type, after.id, after.name FROM 
startup_binlog_test$binlog"
+                        + optionsEarliest;
+        CloseableIterator<Row> rowIterEarliest = 
tEnv.executeSql(queryEarliest).collect();
+        List<String> earliestResults = collectRowsWithTimeout(rowIterEarliest, 
5, true);
+        assertThat(earliestResults)
+                .isEqualTo(
+                        Arrays.asList(
+                                "+I[insert, 1, v1]",
+                                "+I[insert, 2, v2]",
+                                "+I[insert, 3, v3]",
+                                "+I[insert, 4, v4]",
+                                "+I[insert, 5, v5]"));
+
+        // Test scan.startup.mode='timestamp' - should read from specific 
timestamp
+        String optionsTimestamp =
+                " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 
'scan.startup.timestamp' = '150') */";
+        String queryTimestamp =
+                "SELECT _change_type, after.id, after.name FROM 
startup_binlog_test$binlog"
+                        + optionsTimestamp;
+        CloseableIterator<Row> rowIterTimestamp = 
tEnv.executeSql(queryTimestamp).collect();
+        List<String> timestampResults = 
collectRowsWithTimeout(rowIterTimestamp, 2, true);
+        assertThat(timestampResults)
+                .isEqualTo(Arrays.asList("+I[insert, 4, v4]", "+I[insert, 5, 
v5]"));
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
index 2b9111af8..342772996 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -97,7 +97,7 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
                         "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
                         CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
         tEnv.executeSql("use catalog " + CATALOG_NAME);
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
         tEnv.executeSql("create database " + DEFAULT_DB);
         tEnv.useDatabase(DEFAULT_DB);
         // reset clock before each test
@@ -220,16 +220,17 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         assertThat(results).hasSize(2);
 
         // Format: +I[_change_type, _log_offset, _commit_timestamp, event_id, 
event_type]
-        // Log tables use +A (append-only) change type
-        assertThat(results.get(0)).isEqualTo("+I[+A, 0, 1970-01-01T00:00:01Z, 
1, click]");
-        assertThat(results.get(1)).isEqualTo("+I[+A, 1, 1970-01-01T00:00:01Z, 
2, view]");
+        // Log tables use insert (append-only) change type
+        assertThat(results.get(0)).isEqualTo("+I[insert, 0, 
1970-01-01T00:00:01Z, 1, click]");
+        assertThat(results.get(1)).isEqualTo("+I[insert, 1, 
1970-01-01T00:00:01Z, 2, view]");
 
         // Insert more data with new timestamp
         CLOCK.advanceTime(Duration.ofMillis(1000));
         writeRows(conn, tablePath, Arrays.asList(row(3, "purchase")), true);
 
         List<String> moreResults = collectRowsWithTimeout(rowIter, 1, true);
-        assertThat(moreResults.get(0)).isEqualTo("+I[+A, 2, 
1970-01-01T00:00:02Z, 3, purchase]");
+        assertThat(moreResults.get(0))
+                .isEqualTo("+I[insert, 2, 1970-01-01T00:00:02Z, 3, purchase]");
     }
 
     @Test
@@ -254,7 +255,7 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         CLOCK.advanceTime(Duration.ofMillis(100));
         writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1", 100L, 
"Desc-1")), false);
         List<String> insertResult = collectRowsWithTimeout(rowIter, 1, false);
-        assertThat(insertResult.get(0)).isEqualTo("+I[+I, 1, Item-1]");
+        assertThat(insertResult.get(0)).isEqualTo("+I[insert, 1, Item-1]");
 
         // Test UPDATE
         CLOCK.advanceTime(Duration.ofMillis(100));
@@ -264,15 +265,15 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
                 Arrays.asList(row(1, "Item-1-Updated", 150L, 
"Desc-1-Updated")),
                 false);
         List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
-        assertThat(updateResults.get(0)).isEqualTo("+I[-U, 1, Item-1]");
-        assertThat(updateResults.get(1)).isEqualTo("+I[+U, 1, 
Item-1-Updated]");
+        assertThat(updateResults.get(0)).isEqualTo("+I[update_before, 1, 
Item-1]");
+        assertThat(updateResults.get(1)).isEqualTo("+I[update_after, 1, 
Item-1-Updated]");
 
         // Test DELETE
         CLOCK.advanceTime(Duration.ofMillis(100));
         deleteRows(
                 conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L, 
"Desc-1-Updated")));
         List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true);
-        assertThat(deleteResult.get(0)).isEqualTo("+I[-D, 1, Item-1-Updated]");
+        assertThat(deleteResult.get(0)).isEqualTo("+I[delete, 1, 
Item-1-Updated]");
     }
 
     @Test
@@ -304,20 +305,20 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
 
         // With ManualClock and 1 bucket, we can assert exact row values
         // Format: +I[_change_type, _log_offset, _commit_timestamp, id, name, 
amount]
-        assertThat(results.get(0)).isEqualTo("+I[+I, 0, 1970-01-01T00:00:01Z, 
1, Item-1, 100]");
-        assertThat(results.get(1)).isEqualTo("+I[+I, 1, 1970-01-01T00:00:01Z, 
2, Item-2, 200]");
+        assertThat(results.get(0)).isEqualTo("+I[insert, 0, 
1970-01-01T00:00:01Z, 1, Item-1, 100]");
+        assertThat(results.get(1)).isEqualTo("+I[insert, 1, 
1970-01-01T00:00:01Z, 2, Item-2, 200]");
 
         // Test UPDATE operation with new timestamp
         CLOCK.advanceTime(Duration.ofMillis(1000));
         writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 
150L)), false);
 
-        // Collect update records (should get -U and +U)
+        // Collect update records (should get update_before and update_after)
         List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
         assertThat(updateResults).hasSize(2);
         assertThat(updateResults.get(0))
-                .isEqualTo("+I[-U, 2, 1970-01-01T00:00:02Z, 1, Item-1, 100]");
+                .isEqualTo("+I[update_before, 2, 1970-01-01T00:00:02Z, 1, 
Item-1, 100]");
         assertThat(updateResults.get(1))
-                .isEqualTo("+I[+U, 3, 1970-01-01T00:00:02Z, 1, Item-1-Updated, 
150]");
+                .isEqualTo("+I[update_after, 3, 1970-01-01T00:00:02Z, 1, 
Item-1-Updated, 150]");
 
         // Test DELETE operation with new timestamp
         CLOCK.advanceTime(Duration.ofMillis(1000));
@@ -327,7 +328,7 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true);
         assertThat(deleteResult).hasSize(1);
         assertThat(deleteResult.get(0))
-                .isEqualTo("+I[-D, 4, 1970-01-01T00:00:03Z, 2, Item-2, 200]");
+                .isEqualTo("+I[delete, 4, 1970-01-01T00:00:03Z, 2, Item-2, 
200]");
     }
 
     @Test
@@ -361,7 +362,7 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         assertThat(earliestResults).hasSize(5);
         // All should be INSERT change types
         for (String result : earliestResults) {
-            assertThat(result).startsWith("+I[+I,");
+            assertThat(result).startsWith("+I[insert,");
         }
 
         // 2. Test scan.startup.mode='timestamp' - should read records from 
specific timestamp
@@ -374,9 +375,9 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         assertThat(timestampResults).hasSize(2);
         // Should contain records from batch2 only
         assertThat(timestampResults)
-                .containsExactlyInAnyOrder(
-                        "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
-                        "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+                .containsExactly(
+                        "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
+                        "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
     }
 
     @Test
@@ -406,15 +407,19 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         // Collect initial inserts
         List<String> results = collectRowsWithTimeout(rowIter, 3, false);
         assertThat(results)
-                .containsExactlyInAnyOrder(
-                        "+I[+I, 1, Item-1, us]", "+I[+I, 2, Item-2, us]", 
"+I[+I, 3, Item-3, eu]");
+                .containsExactly(
+                        "+I[insert, 1, Item-1, us]",
+                        "+I[insert, 2, Item-2, us]",
+                        "+I[insert, 3, Item-3, eu]");
 
         // Update a record in a specific partition
         CLOCK.advanceTime(Duration.ofMillis(100));
         tEnv.executeSql("INSERT INTO partitioned_test VALUES (1, 
'Item-1-Updated', 'us')").await();
         List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
         assertThat(updateResults)
-                .containsExactly("+I[-U, 1, Item-1, us]", "+I[+U, 1, 
Item-1-Updated, us]");
+                .containsExactly(
+                        "+I[update_before, 1, Item-1, us]",
+                        "+I[update_after, 1, Item-1-Updated, us]");
 
         rowIter.close();
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java
new file mode 100644
index 000000000..a09dd7c41
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit test for {@link BinlogRowConverter}. */
+class BinlogRowConverterTest {
+
+    private RowType testRowType;
+    private BinlogRowConverter converter;
+
+    @BeforeEach
+    void setUp() {
+        // Create a simple test table schema: (id INT, name STRING, amount 
BIGINT)
+        testRowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("name", DataTypes.STRING())
+                        .field("amount", DataTypes.BIGINT())
+                        .build();
+
+        converter = new BinlogRowConverter(testRowType);
+    }
+
+    @Test
+    void testConvertInsertRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.INSERT, 100L, 1000L, 1, 
"Alice", 5000L);
+
+        RowData result = converter.convert(record);
+
+        // Verify row kind is always INSERT for virtual tables
+        assertThat(result).isNotNull();
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        
assertThat(result.getString(0)).isEqualTo(StringData.fromString("insert"));
+        assertThat(result.getLong(1)).isEqualTo(100L); // log offset
+        assertThat(result.getTimestamp(2, 
3)).isEqualTo(TimestampData.fromEpochMillis(1000L));
+
+        // Verify before is null for INSERT
+        assertThat(result.isNullAt(3)).isTrue();
+
+        // Verify after contains the row data
+        RowData afterRow = result.getRow(4, 3);
+        assertThat(afterRow).isNotNull();
+        assertThat(afterRow.getInt(0)).isEqualTo(1); // id
+        assertThat(afterRow.getString(1).toString()).isEqualTo("Alice"); // 
name
+        assertThat(afterRow.getLong(2)).isEqualTo(5000L); // amount
+    }
+
+    @Test
+    void testConvertUpdateMerge() throws Exception {
+        // Send -U (UPDATE_BEFORE) - should return null (buffered)
+        LogRecord beforeRecord =
+                createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2000L, 2, 
"Bob", 3000L);
+        RowData beforeResult = converter.convert(beforeRecord);
+        assertThat(beforeResult).isNull();
+
+        // Send +U (UPDATE_AFTER) - should return merged row
+        LogRecord afterRecord =
+                createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2000L, 2, 
"Bob-Updated", 4000L);
+        RowData result = converter.convert(afterRecord);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        
assertThat(result.getString(0)).isEqualTo(StringData.fromString("update"));
+        // Offset and timestamp should be from the -U record (first entry of 
update pair)
+        assertThat(result.getLong(1)).isEqualTo(200L);
+        assertThat(result.getTimestamp(2, 
3)).isEqualTo(TimestampData.fromEpochMillis(2000L));
+
+        // Verify before contains old data
+        RowData beforeRow = result.getRow(3, 3);
+        assertThat(beforeRow).isNotNull();
+        assertThat(beforeRow.getInt(0)).isEqualTo(2);
+        assertThat(beforeRow.getString(1).toString()).isEqualTo("Bob");
+        assertThat(beforeRow.getLong(2)).isEqualTo(3000L);
+
+        // Verify after contains new data
+        RowData afterRow = result.getRow(4, 3);
+        assertThat(afterRow).isNotNull();
+        assertThat(afterRow.getInt(0)).isEqualTo(2);
+        assertThat(afterRow.getString(1).toString()).isEqualTo("Bob-Updated");
+        assertThat(afterRow.getLong(2)).isEqualTo(4000L);
+    }
+
+    @Test
+    void testConvertDeleteRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.DELETE, 300L, 3000L, 3, 
"Charlie", 1000L);
+
+        RowData result = converter.convert(record);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        
assertThat(result.getString(0)).isEqualTo(StringData.fromString("delete"));
+        assertThat(result.getLong(1)).isEqualTo(300L);
+        assertThat(result.getTimestamp(2, 
3)).isEqualTo(TimestampData.fromEpochMillis(3000L));
+
+        // Verify before contains the deleted row data
+        RowData beforeRow = result.getRow(3, 3);
+        assertThat(beforeRow).isNotNull();
+        assertThat(beforeRow.getInt(0)).isEqualTo(3);
+        assertThat(beforeRow.getString(1).toString()).isEqualTo("Charlie");
+        assertThat(beforeRow.getLong(2)).isEqualTo(1000L);
+
+        // Verify after is null for DELETE
+        assertThat(result.isNullAt(4)).isTrue();
+    }
+
+    @Test
+    void testUpdateBeforeReturnsNull() throws Exception {
+        LogRecord record =
+                createLogRecord(ChangeType.UPDATE_BEFORE, 400L, 4000L, 4, 
"Diana", 2000L);
+
+        // UPDATE_BEFORE should return null (buffered for merging)
+        RowData result = converter.convert(record);
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void testUpdateAfterWithoutBeforeThrows() throws Exception {
+        // Sending +U without a preceding -U should throw
+        LogRecord record = createLogRecord(ChangeType.UPDATE_AFTER, 500L, 
5000L, 5, "Eve", 6000L);
+
+        assertThatThrownBy(() -> converter.convert(record))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("UPDATE_AFTER (+U) without a preceding 
UPDATE_BEFORE (-U)");
+    }
+
+    @Test
+    void testAppendOnlyUnsupported() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.APPEND_ONLY, 600L, 
6000L, 6, "Frank", 7000L);
+
+        assertThatThrownBy(() -> converter.convert(record))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("$binlog virtual table does not support 
change type");
+    }
+
+    @Test
+    void testProducedTypeHasNestedRowColumns() {
+        org.apache.flink.table.types.logical.RowType producedType = 
converter.getProducedType();
+
+        // Should have 5 columns: _change_type, _log_offset, 
_commit_timestamp, before, after
+        assertThat(producedType.getFieldCount()).isEqualTo(5);
+
+        // Check column names
+        assertThat(producedType.getFieldNames())
+                .containsExactly(
+                        "_change_type", "_log_offset", "_commit_timestamp", 
"before", "after");
+
+        // Check metadata column types
+        assertThat(producedType.getTypeAt(0))
+                
.isInstanceOf(org.apache.flink.table.types.logical.VarCharType.class);
+        assertThat(producedType.getTypeAt(1))
+                
.isInstanceOf(org.apache.flink.table.types.logical.BigIntType.class);
+        assertThat(producedType.getTypeAt(2))
+                
.isInstanceOf(org.apache.flink.table.types.logical.LocalZonedTimestampType.class);
+
+        // Check before and after are ROW types
+        assertThat(producedType.getTypeAt(3))
+                
.isInstanceOf(org.apache.flink.table.types.logical.RowType.class);
+        assertThat(producedType.getTypeAt(4))
+                
.isInstanceOf(org.apache.flink.table.types.logical.RowType.class);
+
+        // Check nested ROW has the original columns
+        org.apache.flink.table.types.logical.RowType beforeType =
+                (org.apache.flink.table.types.logical.RowType) 
producedType.getTypeAt(3);
+        assertThat(beforeType.getFieldNames()).containsExactly("id", "name", 
"amount");
+
+        // before/after ROW types should be nullable
+        assertThat(producedType.getTypeAt(3).isNullable()).isTrue();
+        assertThat(producedType.getTypeAt(4).isNullable()).isTrue();
+    }
+
+    @Test
+    void testMultipleUpdatesInSequence() throws Exception {
+        // First update pair
+        converter.convert(createLogRecord(ChangeType.UPDATE_BEFORE, 10L, 
1000L, 1, "A", 100L));
+        RowData result1 =
+                converter.convert(
+                        createLogRecord(ChangeType.UPDATE_AFTER, 11L, 1000L, 
1, "B", 200L));
+        assertThat(result1).isNotNull();
+        
assertThat(result1.getString(0)).isEqualTo(StringData.fromString("update"));
+
+        // Second update pair (converter state should be clean after first 
merge)
+        converter.convert(createLogRecord(ChangeType.UPDATE_BEFORE, 20L, 
2000L, 1, "B", 200L));
+        RowData result2 =
+                converter.convert(
+                        createLogRecord(ChangeType.UPDATE_AFTER, 21L, 2000L, 
1, "C", 300L));
+        assertThat(result2).isNotNull();
+        
assertThat(result2.getString(0)).isEqualTo(StringData.fromString("update"));
+        assertThat(result2.getLong(1)).isEqualTo(20L); // offset from second -U
+    }
+
+    private LogRecord createLogRecord(
+            ChangeType changeType, long offset, long timestamp, int id, String 
name, long amount)
+            throws Exception {
+        // Create an IndexedRow with test data
+        IndexedRow row = new IndexedRow(testRowType.getChildren().toArray(new 
DataType[0]));
+        try (IndexedRowWriter writer =
+                new IndexedRowWriter(testRowType.getChildren().toArray(new 
DataType[0]))) {
+            writer.writeInt(id);
+            writer.writeString(BinaryString.fromString(name));
+            writer.writeLong(amount);
+            writer.complete();
+
+            row.pointTo(writer.segment(), 0, writer.position());
+
+            return new GenericRecord(offset, timestamp, changeType, row);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
index c569ba86d..55f83a514 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
@@ -65,7 +65,7 @@ class ChangelogRowConverterTest {
         assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
 
         // Verify metadata columns
-        assertThat(result.getString(0)).isEqualTo(StringData.fromString("+I"));
+        
assertThat(result.getString(0)).isEqualTo(StringData.fromString("insert"));
         assertThat(result.getLong(1)).isEqualTo(100L); // log offset
         assertThat(result.getTimestamp(2, 3)).isNotNull(); // commit timestamp
 
@@ -88,7 +88,7 @@ class ChangelogRowConverterTest {
         assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
 
         // Verify change type metadata
-        assertThat(result.getString(0)).isEqualTo(StringData.fromString("-U"));
+        
assertThat(result.getString(0)).isEqualTo(StringData.fromString("update_before"));
         assertThat(result.getLong(1)).isEqualTo(200L);
 
         // Verify physical columns
@@ -103,7 +103,7 @@ class ChangelogRowConverterTest {
 
         RowData result = converter.convert(record);
 
-        assertThat(result.getString(0)).isEqualTo(StringData.fromString("+U"));
+        
assertThat(result.getString(0)).isEqualTo(StringData.fromString("update_after"));
         assertThat(result.getLong(1)).isEqualTo(201L);
         assertThat(result.getInt(3)).isEqualTo(2);
         assertThat(result.getString(4).toString()).isEqualTo("Bob");
@@ -116,7 +116,7 @@ class ChangelogRowConverterTest {
 
         RowData result = converter.convert(record);
 
-        assertThat(result.getString(0)).isEqualTo(StringData.fromString("-D"));
+        
assertThat(result.getString(0)).isEqualTo(StringData.fromString("delete"));
         assertThat(result.getLong(1)).isEqualTo(300L);
         assertThat(result.getInt(3)).isEqualTo(3);
         assertThat(result.getString(4).toString()).isEqualTo("Charlie");
@@ -151,7 +151,7 @@ class ChangelogRowConverterTest {
                         converter
                                 .convert(createLogRecord(ChangeType.INSERT, 
1L, 1, "Test", 100L))
                                 .getString(0))
-                .isEqualTo(StringData.fromString("+I"));
+                .isEqualTo(StringData.fromString("insert"));
 
         assertThat(
                         converter
@@ -159,7 +159,7 @@ class ChangelogRowConverterTest {
                                         createLogRecord(
                                                 ChangeType.UPDATE_BEFORE, 2L, 
1, "Test", 100L))
                                 .getString(0))
-                .isEqualTo(StringData.fromString("-U"));
+                .isEqualTo(StringData.fromString("update_before"));
 
         assertThat(
                         converter
@@ -167,13 +167,13 @@ class ChangelogRowConverterTest {
                                         createLogRecord(
                                                 ChangeType.UPDATE_AFTER, 3L, 
1, "Test", 100L))
                                 .getString(0))
-                .isEqualTo(StringData.fromString("+U"));
+                .isEqualTo(StringData.fromString("update_after"));
 
         assertThat(
                         converter
                                 .convert(createLogRecord(ChangeType.DELETE, 
4L, 1, "Test", 100L))
                                 .getString(0))
-                .isEqualTo(StringData.fromString("-D"));
+                .isEqualTo(StringData.fromString("delete"));
 
         // For log tables (append-only)
         assertThat(
@@ -182,7 +182,7 @@ class ChangelogRowConverterTest {
                                         createLogRecord(
                                                 ChangeType.APPEND_ONLY, 5L, 1, 
"Test", 100L))
                                 .getString(0))
-                .isEqualTo(StringData.fromString("+A"));
+                .isEqualTo(StringData.fromString("insert"));
     }
 
     private LogRecord createLogRecord(


Reply via email to