This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 259a143d49 [cdc] Add metadata column support for Kafka CDC connector 
(#7315)
259a143d49 is described below

commit 259a143d49432dd74156d33dbcec38effafd376a
Author: Nick Del Nano <[email protected]>
AuthorDate: Thu Mar 5 02:25:03 2026 -0800

    [cdc] Add metadata column support for Kafka CDC connector (#7315)
    
    This PR supercedes https://github.com/apache/paimon/pull/6353 - I took
    over this work from my colleague @gmdfalk
    
    ## Description
    Add --metadata_column support to Paimon Kafka CDC connector, similar to
    the already existing options added for MySQL and Postgres:
    https://github.com/apache/paimon/pull/2077
    Also add optional --metadata_column_prefix to avoid conflicts with
    existing Paimon fields like topic, timestamp etc.
    
    Supported metadata columns are those on
    org.apache.kafka.clients.consumer.ConsumerRecord i.e.:
    
    - topic
    - partition
    - offset
    - timestamp
    - timestampType: This is the name of the enum i.e. NoTimestampType,
    CreateTime or LogAppendTime
    
    The feature is backwards compatible. It's only active when
    --metadata_column is supplied resp.
    SynchronizationActionBase.withMetadataColumns is used.
    
    ## Motivation
    This is a requested feature:
    https://github.com/apache/paimon/issues/3210
    
    We primarly use this feature for two purposes:
    
    1. Troubleshooting and data lineage (e.g. where in our Kafka
    infrastructure does this Paimon row come from?)
    2. Mapping large Kafka topic partitions 1:1 to Paimon buckets to avoid
    reshuffling (see this issue it would solve:
    https://github.com/apache/paimon/issues/3249)
    
    ## Tests
    Unit and Integration Tests
    
    ## API and Format
    No changes to public apis or storage format.
    
    The changes here are contained to the flink cdc package but I did have
    to update CdcSourceRecord since it previously didn't provide a way to
    surface arbitrary metadata for a record.
    
    The metadata attribute on CdcSourceRecord is intentionally a generic Map
    so that it can potentially be used to add metadata support for other
    connectors like Pulsar or Mongo that are not yet implemented.
    
    ## Documentation
    Added the new --metadata_column and --metadata_column_prefix parameter
    to Kafka CDC docs.
    
    ## Dev notes
    For running integration tests on MacOS with Rancher Desktop, i had to
    properly expose the docker socket to testcontainers e.g. system wide via
    sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock.
    
    ---------
    
    Signed-off-by: Max Falk <[email protected]>
    Co-authored-by: Max Falk <[email protected]>
    Co-authored-by: Max Falk <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 docs/content/cdc-ingestion/kafka-cdc.md            |   4 +
 .../shortcodes/generated/kafka_sync_database.html  |   8 +
 .../shortcodes/generated/kafka_sync_table.html     |  10 +-
 .../flink/action/cdc/CdcActionCommonUtils.java     |   1 +
 .../flink/action/cdc/CdcMetadataConverter.java     |  16 +-
 .../flink/action/cdc/CdcMetadataProcessor.java     |  11 +-
 .../paimon/flink/action/cdc/CdcSourceRecord.java   |  40 ++-
 .../action/cdc/PrefixedMetadataConverter.java      |  62 ++++
 .../paimon/flink/action/cdc/SyncJobHandler.java    |   5 +-
 .../action/cdc/SyncTableActionFactoryBase.java     |   6 +
 .../action/cdc/SynchronizationActionBase.java      |  12 +
 .../cdc/format/AbstractJsonRecordParser.java       |   3 +
 .../action/cdc/format/AbstractRecordParser.java    |  36 ++-
 .../paimon/flink/action/cdc/format/DataFormat.java |  19 ++
 .../cdc/format/aliyun/AliyunRecordParser.java      |   2 +
 .../action/cdc/format/canal/CanalRecordParser.java |   1 +
 .../format/debezium/DebeziumAvroRecordParser.java  |   2 +
 .../format/debezium/DebeziumBsonRecordParser.java  |   1 +
 .../format/debezium/DebeziumJsonRecordParser.java  |   2 +
 .../flink/action/cdc/kafka/KafkaActionUtils.java   |  13 +
 .../KafkaDebeziumAvroDeserializationSchema.java    |   4 +-
 .../KafkaDebeziumJsonDeserializationSchema.java    |   5 +-
 .../action/cdc/kafka/KafkaMetadataConverter.java   | 136 ++++++++
 .../cdc/kafka/KafkaSyncDatabaseActionFactory.java  |   2 +
 .../cdc/kafka/KafkaSyncTableActionFactory.java     |   2 +
 .../flink/action/cdc/CdcActionITCaseBase.java      |  10 +
 .../flink/action/cdc/KafkaMetadataE2ETest.java     | 360 +++++++++++++++++++++
 .../kafka/KafkaAWSDMSSyncTableActionITCase.java    |   6 +
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |   6 +
 .../kafka/KafkaDebeziumSyncTableActionITCase.java  |   6 +
 .../kafka/KafkaMaxwellSyncTableActionITCase.java   |   6 +
 .../cdc/kafka/KafkaMetadataConverterTest.java      | 194 +++++++++++
 .../cdc/kafka/KafkaOggSyncTableActionITCase.java   |   6 +
 .../cdc/kafka/KafkaSyncTableActionITCase.java      |  54 ++++
 .../table/metadatacolumn/aws-dms-data-1.txt        |  19 ++
 .../canal/table/metadatacolumn/canal-data-1.txt    |  19 ++
 .../table/metadatacolumn/debezium-data-1.txt       |  19 ++
 .../table/metadatacolumn/maxwell-data-1.txt        |  19 ++
 .../kafka/ogg/table/metadatacolumn/ogg-data-1.txt  |  19 ++
 39 files changed, 1131 insertions(+), 15 deletions(-)

diff --git a/docs/content/cdc-ingestion/kafka-cdc.md 
b/docs/content/cdc-ingestion/kafka-cdc.md
index 56c7bf0730..81ec7c18ff 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -103,6 +103,8 @@ To use this feature through `flink run`, run the following 
shell command.
     [--primary_keys <primary-keys>] \
     [--type_mapping to-string] \
     [--computed_column <'column-name=expr-name(args[, ...])'> 
[--computed_column ...]] \
+    [--metadata_column <metadata-column> [--metadata_column ...]] \
+    [--metadata_column_prefix <metadata-column-prefix>] \
     [--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
     [--table_conf <paimon-table-sink-conf> [--table_conf 
<paimon-table-sink-conf> ...]]
@@ -215,6 +217,8 @@ To use this feature through `flink run`, run the following 
shell command.
     [--partition_keys <partition_keys>] \
     [--primary_keys <primary-keys>] \
     [--computed_column <'column-name=expr-name(args[, ...])'> 
[--computed_column ...]] \
+    [--metadata_column <metadata-column> [--metadata_column ...]] \
+    [--metadata_column_prefix <metadata-column-prefix>] \
     [--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
     [--table_conf <paimon-table-sink-conf> [--table_conf 
<paimon-table-sink-conf> ...]]
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html 
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index e8d5898c34..7e31babec5 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -94,6 +94,14 @@ under the License.
         <td><h5>--computed_column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
Kafka topic's table field name. See <a 
href="../overview/#computed-functions">here</a> for a complete list of 
configurations. NOTICE: It returns null if the referenced column does not exist 
in the source table.</td>
     </tr>
+    <tr>
+        <td><h5>--metadata_column</h5></td>
+        <td>--metadata_column is used to specify which metadata columns to 
include in the output schema of the connector. Metadata columns provide 
additional information related to the source data. Available values are topic, 
partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 
'CreateTime' or 'LogAppendTime').</td>
+    </tr>
+    <tr>
+        <td><h5>--metadata_column_prefix</h5></td>
+        <td>--metadata_column_prefix is optionally used to set a prefix for 
metadata columns in the Paimon table to avoid conflicts with existing 
attributes. For example, with prefix "__kafka_", the metadata column "topic" 
will be stored as "__kafka_topic" field.</td>
+    </tr>
     <tr>
         <td><h5>--eager_init</h5></td>
         <td>It is default false. If true, all relevant tables commiter will be 
initialized eagerly, which means those tables could be forced to create 
snapshot.</td>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html 
b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index 10669f594f..029bf1a5ed 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -70,6 +70,14 @@ under the License.
         <td><h5>--computed_column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
Kafka topic's table field name. See <a 
href="../overview/#computed-functions">here</a> for a complete list of 
configurations. </td>
     </tr>
+    <tr>
+        <td><h5>--metadata_column</h5></td>
+        <td>--metadata_column is used to specify which metadata columns to 
include in the output schema of the connector. Metadata columns provide 
additional information related to the source data. Available values are topic, 
partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 
'CreateTime' or 'LogAppendTime').</td>
+    </tr>
+    <tr>
+        <td><h5>--metadata_column_prefix</h5></td>
+        <td>--metadata_column_prefix is optionally used to set a prefix for 
metadata columns in the Paimon table to avoid conflicts with existing 
attributes. For example, with prefix "__kafka_", the metadata column "topic" 
will be stored as "__kafka_topic" field.</td>
+    </tr>
     <tr>
         <td><h5>--kafka_conf</h5></td>
         <td>The configuration for Flink Kafka sources. Each configuration 
should be specified in the format `key=value`. `properties.bootstrap.servers`, 
`topic/topic-pattern`, `properties.group.id`,  and `value.format` are required 
configurations, others are optional.See its <a 
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options";>document</a>
 for a complete list of configurations.</td>
@@ -83,4 +91,4 @@ under the License.
         <td>The configuration for Paimon table sink. Each configuration should 
be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a 
complete list of table configurations.</td>
     </tr>
     </tbody>
-</table>
\ No newline at end of file
+</table>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 79ecb1941e..dc487c1033 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -68,6 +68,7 @@ public class CdcActionCommonUtils {
     public static final String PRIMARY_KEYS = "primary_keys";
     public static final String COMPUTED_COLUMN = "computed_column";
     public static final String METADATA_COLUMN = "metadata_column";
+    public static final String METADATA_COLUMN_PREFIX = 
"metadata_column_prefix";
     public static final String MULTIPLE_TABLE_PARTITION_KEYS = 
"multiple_table_partition_keys";
     public static final String EAGER_INIT = "eager_init";
     public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java
index 3ffeaa3d78..0ee292e251 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java
@@ -34,13 +34,25 @@ import java.util.TimeZone;
  * A functional interface for converting CDC metadata.
  *
  * <p>This interface provides a mechanism to convert Change Data Capture (CDC) 
metadata from a given
- * {@link JsonNode} source. Implementations of this interface can be used to 
process and transform
- * metadata entries from CDC sources.
+ * {@link JsonNode} source or {@link CdcSourceRecord}. Implementations of this 
interface can be used
+ * to process and transform metadata entries from CDC sources.
  */
 public interface CdcMetadataConverter extends Serializable {
 
     String read(JsonNode payload);
 
+    /**
+     * Read metadata from a CDC source record. Default implementation throws
+     * UnsupportedOperationException to maintain backward compatibility.
+     *
+     * @param record the CDC source record
+     * @return the metadata value as a string
+     */
+    default String read(CdcSourceRecord record) {
+        throw new UnsupportedOperationException(
+                "This metadata converter does not support reading from 
CdcSourceRecord");
+    }
+
     DataType dataType();
 
     String columnName();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java
index 9fdd7a4377..ce1114b3fd 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc;
 
+import org.apache.paimon.flink.action.cdc.kafka.KafkaMetadataConverter;
+
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
 import java.util.Arrays;
@@ -49,7 +51,14 @@ public enum CdcMetadataProcessor {
             new CdcMetadataConverter.DatabaseNameConverter(),
             new CdcMetadataConverter.TableNameConverter(),
             new CdcMetadataConverter.SchemaNameConverter(),
-            new CdcMetadataConverter.OpTsConverter());
+            new CdcMetadataConverter.OpTsConverter()),
+    KAFKA_METADATA_PROCESSOR(
+            SyncJobHandler.SourceType.KAFKA,
+            new KafkaMetadataConverter.TopicConverter(),
+            new KafkaMetadataConverter.PartitionConverter(),
+            new KafkaMetadataConverter.OffsetConverter(),
+            new KafkaMetadataConverter.TimestampConverter(),
+            new KafkaMetadataConverter.TimestampTypeConverter());
 
     private final SyncJobHandler.SourceType sourceType;
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
index 51a14534c4..d0309ff3c7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
@@ -21,6 +21,9 @@ package org.apache.paimon.flink.action.cdc;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 /** A data change record from the CDC source. */
@@ -35,14 +38,29 @@ public class CdcSourceRecord implements Serializable {
     // TODO Use generics to support more scenarios.
     private final Object value;
 
+    // Generic metadata map - any source can add metadata
+    private final Map<String, Object> metadata;
+
     public CdcSourceRecord(@Nullable String topic, @Nullable Object key, 
Object value) {
-        this.topic = topic;
-        this.key = key;
-        this.value = value;
+        this(topic, key, value, null);
     }
 
     public CdcSourceRecord(Object value) {
-        this(null, null, value);
+        this(null, null, value, null);
+    }
+
+    public CdcSourceRecord(
+            @Nullable String topic,
+            @Nullable Object key,
+            Object value,
+            @Nullable Map<String, Object> metadata) {
+        this.topic = topic;
+        this.key = key;
+        this.value = value;
+        this.metadata =
+                metadata != null
+                        ? Collections.unmodifiableMap(new HashMap<>(metadata))
+                        : Collections.emptyMap();
     }
 
     @Nullable
@@ -59,6 +77,15 @@ public class CdcSourceRecord implements Serializable {
         return value;
     }
 
+    public Map<String, Object> getMetadata() {
+        return metadata;
+    }
+
+    @Nullable
+    public Object getMetadata(String key) {
+        return metadata.get(key);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof CdcSourceRecord)) {
@@ -68,12 +95,13 @@ public class CdcSourceRecord implements Serializable {
         CdcSourceRecord that = (CdcSourceRecord) o;
         return Objects.equals(topic, that.topic)
                 && Objects.equals(key, that.key)
-                && Objects.equals(value, that.value);
+                && Objects.equals(value, that.value)
+                && Objects.equals(metadata, that.metadata);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topic, key, value);
+        return Objects.hash(topic, key, value, metadata);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/PrefixedMetadataConverter.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/PrefixedMetadataConverter.java
new file mode 100644
index 0000000000..9a4ef9b49d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/PrefixedMetadataConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.types.DataType;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Wraps a {@link CdcMetadataConverter} to add a prefix to its column name.
+ *
+ * <p>This decorator allows adding prefixes like "__kafka_" to metadata column 
names to avoid
+ * collisions with source data columns, while keeping the underlying converter 
logic unchanged.
+ */
+public class PrefixedMetadataConverter implements CdcMetadataConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    private final CdcMetadataConverter delegate;
+    private final String prefix;
+
+    public PrefixedMetadataConverter(CdcMetadataConverter delegate, String 
prefix) {
+        this.delegate = delegate;
+        this.prefix = prefix != null ? prefix : "";
+    }
+
+    @Override
+    public String columnName() {
+        return prefix + delegate.columnName();
+    }
+
+    @Override
+    public String read(JsonNode payload) {
+        return delegate.read(payload);
+    }
+
+    @Override
+    public String read(CdcSourceRecord record) {
+        return delegate.read(record);
+    }
+
+    @Override
+    public DataType dataType() {
+        return delegate.dataType();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index 323078aef6..22377f7ee5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -207,9 +207,10 @@ public class SyncJobHandler {
                 return new PostgresRecordParser(
                         cdcSourceConfig, computedColumns, typeMapping, 
metadataConverters);
             case KAFKA:
+                return provideDataFormat()
+                        .createParser(typeMapping, computedColumns, 
metadataConverters);
             case PULSAR:
-                DataFormat dataFormat = provideDataFormat();
-                return dataFormat.createParser(typeMapping, computedColumns);
+                return provideDataFormat().createParser(typeMapping, 
computedColumns);
             case MONGODB:
                 return new MongoDBRecordParser(computedColumns, 
cdcSourceConfig);
             default:
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
index cb9b678d19..1e8c292ff1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
@@ -29,6 +29,7 @@ import java.util.Optional;
 
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN_PREFIX;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
@@ -64,6 +65,11 @@ public abstract class SyncTableActionFactoryBase
         }
 
         if (params.has(METADATA_COLUMN)) {
+            // Parse optional prefix first
+            if (params.has(METADATA_COLUMN_PREFIX)) {
+                
action.withMetadataColumnPrefix(params.get(METADATA_COLUMN_PREFIX));
+            }
+
             List<String> metadataColumns =
                     new ArrayList<>(params.getMultiParameter(METADATA_COLUMN));
             if (metadataColumns.size() == 1) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index d4b2b15394..fd27f6faad 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -72,6 +72,7 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
     // in paimon schema if pkeys are not specified in action command
     protected boolean syncPKeysFromSourceSchema = true;
     protected CdcMetadataConverter[] metadataConverters = new 
CdcMetadataConverter[] {};
+    protected String metadataColumnPrefix = "";
 
     public SynchronizationActionBase(
             String database,
@@ -101,10 +102,21 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
         this.metadataConverters =
                 metadataColumns.stream()
                         .map(this.syncJobHandler::provideMetadataConverter)
+                        .map(
+                                converter ->
+                                        metadataColumnPrefix.isEmpty()
+                                                ? converter
+                                                : new 
PrefixedMetadataConverter(
+                                                        converter, 
metadataColumnPrefix))
                         .toArray(CdcMetadataConverter[]::new);
         return this;
     }
 
+    public SynchronizationActionBase withMetadataColumnPrefix(String prefix) {
+        this.metadataColumnPrefix = prefix != null ? prefix : "";
+        return this;
+    }
+
     public SynchronizationActionBase syncPKeysFromSourceSchema(boolean flag) {
         this.syncPKeysFromSourceSchema = flag;
         return this;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
index 76289aa355..5eb80a7151 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
@@ -68,6 +68,7 @@ public abstract class AbstractJsonRecordParser extends 
AbstractRecordParser {
     }
 
     protected void setRoot(CdcSourceRecord record) {
+        super.setRoot(record); // Store current record for metadata access
         root = (JsonNode) record.getValue();
     }
 
@@ -104,6 +105,8 @@ public abstract class AbstractJsonRecordParser extends 
AbstractRecordParser {
                                             return 
Objects.toString(entry.getValue());
                                         }));
         evalComputedColumns(rowData, schemaBuilder);
+        evalMetadataColumns(rowData, schemaBuilder);
+
         return rowData;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
index 85442067b9..5c092d881e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.format;
 
+import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
@@ -55,10 +56,20 @@ public abstract class AbstractRecordParser
     protected static final String FIELD_DATABASE = "database";
     protected final TypeMapping typeMapping;
     protected final List<ComputedColumn> computedColumns;
+    protected CdcMetadataConverter[] metadataConverters;
+    protected CdcSourceRecord currentRecord; // Store current record for 
metadata access
 
     public AbstractRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
         this.typeMapping = typeMapping;
         this.computedColumns = computedColumns;
+        this.metadataConverters = new CdcMetadataConverter[0];
+    }
+
+    /** Sets metadata converters and returns this parser for chaining. */
+    public AbstractRecordParser withMetadataConverters(CdcMetadataConverter[] 
metadataConverters) {
+        this.metadataConverters =
+                metadataConverters != null ? metadataConverters : new 
CdcMetadataConverter[0];
+        return this;
     }
 
     @Nullable
@@ -88,7 +99,11 @@ public abstract class AbstractRecordParser
         }
     }
 
-    protected abstract void setRoot(CdcSourceRecord record);
+    protected void setRoot(CdcSourceRecord record) {
+        this.currentRecord = record;
+        // Call the original setRoot method for backward compatibility
+        // Subclasses can override this method as they used to
+    }
 
     protected abstract List<RichCdcMultiplexRecord> extractRecords();
 
@@ -111,6 +126,25 @@ public abstract class AbstractRecordParser
                 });
     }
 
+    /** Extract metadata values using metadata converters. */
+    protected void evalMetadataColumns(
+            Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
+        for (CdcMetadataConverter metadataConverter : metadataConverters) {
+            try {
+                String value = metadataConverter.read(currentRecord);
+                if (value != null) {
+                    rowData.put(metadataConverter.columnName(), value);
+                }
+                schemaBuilder.column(metadataConverter.columnName(), 
metadataConverter.dataType());
+            } catch (UnsupportedOperationException e) {
+                // This converter doesn't support CdcSourceRecord, skip it
+                LOG.debug(
+                        "Metadata converter {} does not support 
CdcSourceRecord",
+                        metadataConverter.getClass().getSimpleName());
+            }
+        }
+    }
+
     /** Handle case sensitivity here. */
     protected RichCdcMultiplexRecord createRecord(
             RowKind rowKind, Map<String, String> data, CdcSchema.Builder 
schemaBuilder) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
index 711f596ac5..78c9828ee1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.format;
 
+import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
@@ -38,12 +39,30 @@ public interface DataFormat {
      * Creates a new instance of {@link AbstractRecordParser} for this data 
format with the
      * specified configurations.
      *
+     * @param typeMapping Type mapping configuration
      * @param computedColumns List of computed columns to be considered by the 
parser.
      * @return A new instance of {@link AbstractRecordParser}.
      */
     AbstractRecordParser createParser(
             TypeMapping typeMapping, List<ComputedColumn> computedColumns);
 
+    /**
+     * Creates a new instance of {@link AbstractRecordParser} for this data 
format with the
+     * specified configurations including metadata converters.
+     *
+     * @param typeMapping Type mapping configuration
+     * @param computedColumns List of computed columns to be considered by the 
parser.
+     * @param metadataConverters Array of metadata converters for extracting 
CDC metadata
+     * @return A new instance of {@link AbstractRecordParser}.
+     */
+    default AbstractRecordParser createParser(
+            TypeMapping typeMapping,
+            List<ComputedColumn> computedColumns,
+            CdcMetadataConverter[] metadataConverters) {
+        return createParser(typeMapping, computedColumns)
+                .withMetadataConverters(metadataConverters);
+    }
+
     KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
             Configuration cdcSourceConfig);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
index e14e4ab4b7..4f695d11a0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
@@ -192,6 +192,8 @@ public class AliyunRecordParser extends 
AbstractJsonRecordParser {
         }
 
         evalComputedColumns(rowData, schemaBuilder);
+        evalMetadataColumns(rowData, schemaBuilder);
+
         return rowData;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 5942ad0276..21d5935aac 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -181,6 +181,7 @@ public class CanalRecordParser extends 
AbstractJsonRecordParser {
         }
 
         evalComputedColumns(rowData, schemaBuilder);
+        evalMetadataColumns(rowData, schemaBuilder);
         return rowData;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
index 7219aab3b6..ef7b52915c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
@@ -80,6 +80,7 @@ public class DebeziumAvroRecordParser extends 
AbstractRecordParser {
 
     @Override
     protected void setRoot(CdcSourceRecord record) {
+        super.setRoot(record); // Store current record for metadata access
         keyRecord = (GenericRecord) record.getKey();
         valueRecord = (GenericRecord) record.getValue();
     }
@@ -159,6 +160,7 @@ public class DebeziumAvroRecordParser extends 
AbstractRecordParser {
         }
 
         evalComputedColumns(resultMap, schemaBuilder);
+        evalMetadataColumns(resultMap, schemaBuilder);
         return resultMap;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
index 5c13170638..134ed8b383 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -137,6 +137,7 @@ public class DebeziumBsonRecordParser extends 
DebeziumJsonRecordParser {
         }
 
         evalComputedColumns(resultMap, schemaBuilder);
+        evalMetadataColumns(resultMap, schemaBuilder);
 
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 19156fb916..14dd00ee3d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -119,6 +119,7 @@ public class DebeziumJsonRecordParser extends 
AbstractJsonRecordParser {
 
     @Override
     protected void setRoot(CdcSourceRecord record) {
+        super.setRoot(record); // Store current record for metadata access
         JsonNode node = (JsonNode) record.getValue();
 
         hasSchema = false;
@@ -212,6 +213,7 @@ public class DebeziumJsonRecordParser extends 
AbstractJsonRecordParser {
         }
 
         evalComputedColumns(resultMap, schemaBuilder);
+        evalMetadataColumns(resultMap, schemaBuilder);
 
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index b937ad2eda..6391981d0f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -320,6 +321,18 @@ public class KafkaActionUtils {
         }
     }
 
+    protected static Map<String, Object> extractKafkaMetadata(
+            ConsumerRecord<byte[], byte[]> message) {
+        // Add the Kafka message metadata that can be used with 
--metadata_column
+        Map<String, Object> kafkaMetadata = new HashMap<>();
+        kafkaMetadata.put("topic", message.topic());
+        kafkaMetadata.put("partition", message.partition());
+        kafkaMetadata.put("offset", message.offset());
+        kafkaMetadata.put("timestamp", message.timestamp());
+        kafkaMetadata.put("timestamp_type", message.timestampType().name);
+        return kafkaMetadata;
+    }
+
     private static class KafkaConsumerWrapper implements 
MessageQueueSchemaUtils.ConsumerWrapper {
 
         private final KafkaConsumer<byte[], byte[]> consumer;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
index eea364d460..1f98c60e8d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
@@ -76,7 +76,9 @@ public class KafkaDebeziumAvroDeserializationSchema
             key = (GenericRecord) keyContainerWithVersion.container();
         }
         GenericRecord value = (GenericRecord) 
valueContainerWithVersion.container();
-        return new CdcSourceRecord(topic, key, value);
+
+        return new CdcSourceRecord(
+                topic, key, value, 
KafkaActionUtils.extractKafkaMetadata(message));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
index 887af5f606..1bd7ed25a0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Map;
 
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
 
@@ -76,7 +77,9 @@ public class KafkaDebeziumJsonDeserializationSchema
             }
 
             JsonNode valueNode = objectMapper.readValue(message.value(), 
JsonNode.class);
-            return new CdcSourceRecord(null, keyNode, valueNode);
+
+            Map<String, Object> kafkaMetadata = 
KafkaActionUtils.extractKafkaMetadata(message);
+            return new CdcSourceRecord(message.topic(), keyNode, valueNode, 
kafkaMetadata);
         } catch (Exception e) {
             LOG.error("Invalid Json:\n{}", new String(message.value()));
             throw e;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java
new file mode 100644
index 0000000000..0bd9f42e31
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.TimeZone;
+
+/**
+ * Kafka-specific implementations of {@link CdcMetadataConverter} for 
extracting Kafka message
+ * metadata.
+ *
+ * <p>These converters read from the generic metadata map in {@link 
CdcSourceRecord} to extract
+ * Kafka-specific metadata like topic, partition, offset, timestamp, and 
timestamp type.
+ */
+public class KafkaMetadataConverter implements CdcMetadataConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final String metadataKey; // Key to lookup in metadata map
+    private final String columnName; // Column name in schema (same as 
metadataKey)
+    private final DataType dataType;
+
+    public KafkaMetadataConverter(String metadataKey, DataType dataType) {
+        this.metadataKey = metadataKey;
+        this.columnName =
+                metadataKey; // Use simple name; prefix is added by 
PrefixedMetadataConverter
+        this.dataType = dataType;
+    }
+
+    @Override
+    public String read(JsonNode source) {
+        throw new UnsupportedOperationException(
+                "Kafka metadata converters should be used with 
CdcSourceRecord, not JsonNode");
+    }
+
+    @Override
+    public String read(CdcSourceRecord record) {
+        Object metadata = record.getMetadata(this.metadataKey);
+        return metadata != null ? metadata.toString() : null;
+    }
+
+    @Override
+    public DataType dataType() {
+        return this.dataType;
+    }
+
+    @Override
+    public String columnName() {
+        return this.columnName;
+    }
+
+    /** Converter for Kafka topic name. */
+    public static class TopicConverter extends KafkaMetadataConverter {
+        private static final long serialVersionUID = 1L;
+
+        public TopicConverter() {
+            super("topic", DataTypes.STRING());
+        }
+
+        @Override
+        public String read(CdcSourceRecord record) {
+            // Topic is stored in the CdcSourceRecord itself, not in metadata 
map
+            return record.getTopic();
+        }
+    }
+
+    /** Converter for Kafka partition number. */
+    public static class PartitionConverter extends KafkaMetadataConverter {
+        private static final long serialVersionUID = 1L;
+
+        public PartitionConverter() {
+            super("partition", DataTypes.INT());
+        }
+    }
+
+    /** Converter for Kafka message offset. */
+    public static class OffsetConverter extends KafkaMetadataConverter {
+        private static final long serialVersionUID = 1L;
+
+        public OffsetConverter() {
+            super("offset", DataTypes.BIGINT());
+        }
+    }
+
+    /** Converter for Kafka message timestamp. */
+    public static class TimestampConverter extends KafkaMetadataConverter {
+        private static final long serialVersionUID = 1L;
+
+        public TimestampConverter() {
+            super("timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
+        }
+
+        @Override
+        public String read(CdcSourceRecord record) {
+            Object timestamp = record.getMetadata(this.metadataKey);
+            if (timestamp instanceof Long) {
+                return DateTimeUtils.formatTimestamp(
+                        Timestamp.fromEpochMillis((Long) timestamp), 
TimeZone.getDefault(), 3);
+            }
+            return null;
+        }
+    }
+
+    /** Converter for Kafka timestamp type. */
+    public static class TimestampTypeConverter extends KafkaMetadataConverter {
+        private static final long serialVersionUID = 1L;
+
+        public TimestampTypeConverter() {
+            super("timestamp_type", DataTypes.STRING());
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index eb3332c731..a73929ac03 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -62,6 +62,8 @@ public class KafkaSyncDatabaseActionFactory
                         + "[--including_tables <table_name|name_regular_expr>] 
\\\n"
                         + "[--excluding_tables <table_name|name_regular_expr>] 
\\\n"
                         + "[--type_mapping <option1,option2...>] \\\n"
+                        + "[--metadata_column <metadata_column>] \\\n"
+                        + "[--metadata_column_prefix <metadata_column_prefix>] 
\\\n"
                         + "[--kafka_conf <kafka_source_conf> [--kafka_conf 
<kafka_source_conf> ...]] \\\n"
                         + "[--catalog_conf <paimon_catalog_conf> 
[--catalog_conf <paimon_catalog_conf> ...]] \\\n"
                         + "[--table_conf <paimon_table_sink_conf> 
[--table_conf <paimon_table_sink_conf> ...]]");
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
index 59976c9abb..3067fa279e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
@@ -60,6 +60,8 @@ public class KafkaSyncTableActionFactory extends 
SyncTableActionFactoryBase {
                         + "[--primary_keys <primary_keys>] \\\n"
                         + "[--type_mapping <option1,option2...>] \\\n"
                         + "[--computed_column <'column_name=expr_name(args[, 
...])'> [--computed_column ...]] \\\n"
+                        + "[--metadata_column <metadata_column>] \\\n"
+                        + "[--metadata_column_prefix <metadata_column_prefix>] 
\\\n"
                         + "[--kafka_conf <kafka_source_conf> [--kafka_conf 
<kafka_source_conf> ...]] \\\n"
                         + "[--catalog_conf <paimon_catalog_conf> 
[--catalog_conf <paimon_catalog_conf> ...]] \\\n"
                         + "[--table_conf <paimon_table_sink_conf> 
[--table_conf <paimon_table_sink_conf> ...]]");
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 312c323e6a..d513788468 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -326,6 +326,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
         private final List<String> computedColumnArgs = new ArrayList<>();
         private final List<String> typeMappingModes = new ArrayList<>();
         private final List<String> metadataColumns = new ArrayList<>();
+        private String metadataColumnPrefix = null;
         private boolean syncPKeysFromSourceSchema = true;
 
         public SyncTableActionBuilder(Class<T> clazz, Map<String, String> 
sourceConfig) {
@@ -372,6 +373,11 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
             return this;
         }
 
+        public SyncTableActionBuilder<T> withMetadataColumnPrefix(String 
prefix) {
+            this.metadataColumnPrefix = prefix;
+            return this;
+        }
+
         public SyncTableActionBuilder<T> syncPKeysFromSourceSchema(boolean 
flag) {
             this.syncPKeysFromSourceSchema = flag;
             return this;
@@ -400,6 +406,10 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
             args.addAll(listToArgs("--type-mapping", typeMappingModes));
 
             args.addAll(listToMultiArgs("--computed-column", 
computedColumnArgs));
+            if (metadataColumnPrefix != null) {
+                args.add("--metadata-column-prefix");
+                args.add(metadataColumnPrefix);
+            }
             args.addAll(listToMultiArgs("--metadata-column", metadataColumns));
             args.add("--use_pkeys_from_source_for_paimon_schema");
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/KafkaMetadataE2ETest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/KafkaMetadataE2ETest.java
new file mode 100644
index 0000000000..2bb11e08e8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/KafkaMetadataE2ETest.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.KafkaMetadataConverter;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * End-to-end unit test for Kafka metadata column support.
+ *
+ * <p>This test validates the complete flow from Kafka ConsumerRecord through 
deserialization,
+ * metadata extraction, and final Paimon row creation with metadata columns.
+ */
+public class KafkaMetadataE2ETest {
+
+    private static final String TEST_TOPIC = "test-topic";
+    private static final int TEST_PARTITION = 5;
+    private static final long TEST_OFFSET = 12345L;
+    private static final long TEST_TIMESTAMP = 1640995200000L;
+    private static final String TEST_TIMESTAMP_TYPE = "CreateTime";
+
+    @Test
+    public void testKafkaMetadataEndToEnd() throws Exception {
+        Map<String, Object> kafkaMetadata = createKafkaMetadata();
+        GenericRecord valueRecord = createDebeziumAvroRecord();
+        CdcSourceRecord cdcSourceRecord =
+                new CdcSourceRecord(TEST_TOPIC, null, valueRecord, 
kafkaMetadata);
+
+        assertThat(cdcSourceRecord.getMetadata()).isNotNull();
+        assertThat(cdcSourceRecord.getMetadata()).hasSize(5);
+        assertThat(cdcSourceRecord.getMetadata("topic")).isEqualTo(TEST_TOPIC);
+        
assertThat(cdcSourceRecord.getMetadata("partition")).isEqualTo(TEST_PARTITION);
+        
assertThat(cdcSourceRecord.getMetadata("offset")).isEqualTo(TEST_OFFSET);
+        
assertThat(cdcSourceRecord.getMetadata("timestamp")).isEqualTo(TEST_TIMESTAMP);
+        
assertThat(cdcSourceRecord.getMetadata("timestamp_type")).isEqualTo(TEST_TIMESTAMP_TYPE);
+
+        CdcMetadataConverter[] metadataConverters = 
createKafkaMetadataConverters();
+        DebeziumAvroRecordParser parser =
+                new DebeziumAvroRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        parser.withMetadataConverters(metadataConverters);
+
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        parser.flatMap(
+                cdcSourceRecord,
+                new org.apache.flink.util.Collector<RichCdcMultiplexRecord>() {
+                    @Override
+                    public void collect(RichCdcMultiplexRecord record) {
+                        records.add(record);
+                    }
+
+                    @Override
+                    public void close() {}
+                });
+
+        assertThat(records).hasSize(1);
+        RichCdcMultiplexRecord richRecord = records.get(0);
+
+        org.apache.paimon.flink.sink.cdc.CdcSchema cdcSchema = 
richRecord.cdcSchema();
+        assertThat(cdcSchema.fields()).isNotEmpty();
+
+        assertThat(cdcSchema.fields().stream().anyMatch(f -> 
f.name().equals("topic"))).isTrue();
+        assertThat(cdcSchema.fields().stream().anyMatch(f -> 
f.name().equals("partition")))
+                .isTrue();
+        assertThat(cdcSchema.fields().stream().anyMatch(f -> 
f.name().equals("offset"))).isTrue();
+        assertThat(cdcSchema.fields().stream().anyMatch(f -> 
f.name().equals("timestamp")))
+                .isTrue();
+        assertThat(cdcSchema.fields().stream().anyMatch(f -> 
f.name().equals("timestamp_type")))
+                .isTrue();
+
+        Map<String, String> rowData = 
richRecord.toRichCdcRecord().toCdcRecord().data();
+        assertThat(rowData).containsKey("topic");
+        assertThat(rowData.get("topic")).isEqualTo(TEST_TOPIC);
+        assertThat(rowData).containsKey("partition");
+        
assertThat(rowData.get("partition")).isEqualTo(String.valueOf(TEST_PARTITION));
+        assertThat(rowData).containsKey("offset");
+        
assertThat(rowData.get("offset")).isEqualTo(String.valueOf(TEST_OFFSET));
+        assertThat(rowData).containsKey("timestamp");
+        assertThat(rowData.get("timestamp")).isNotNull();
+        assertThat(rowData).containsKey("timestamp_type");
+        
assertThat(rowData.get("timestamp_type")).isEqualTo(TEST_TIMESTAMP_TYPE);
+
+        assertThat(rowData).containsKey("id");
+        assertThat(rowData.get("id")).isEqualTo("1");
+        assertThat(rowData).containsKey("name");
+        assertThat(rowData.get("name")).isEqualTo("test_user");
+    }
+
+    @Test
+    public void testMetadataConverterLookup() {
+        
assertThat(CdcMetadataProcessor.converter(SyncJobHandler.SourceType.KAFKA, 
"topic"))
+                .isNotNull()
+                .isInstanceOf(KafkaMetadataConverter.TopicConverter.class);
+
+        
assertThat(CdcMetadataProcessor.converter(SyncJobHandler.SourceType.KAFKA, 
"partition"))
+                .isNotNull()
+                .isInstanceOf(KafkaMetadataConverter.PartitionConverter.class);
+
+        
assertThat(CdcMetadataProcessor.converter(SyncJobHandler.SourceType.KAFKA, 
"offset"))
+                .isNotNull()
+                .isInstanceOf(KafkaMetadataConverter.OffsetConverter.class);
+
+        
assertThat(CdcMetadataProcessor.converter(SyncJobHandler.SourceType.KAFKA, 
"timestamp"))
+                .isNotNull()
+                .isInstanceOf(KafkaMetadataConverter.TimestampConverter.class);
+
+        assertThat(
+                        CdcMetadataProcessor.converter(
+                                SyncJobHandler.SourceType.KAFKA, 
"timestamp_type"))
+                .isNotNull()
+                
.isInstanceOf(KafkaMetadataConverter.TimestampTypeConverter.class);
+
+        assertThatThrownBy(
+                        () ->
+                                CdcMetadataProcessor.converter(
+                                        SyncJobHandler.SourceType.KAFKA, 
"invalid_column"))
+                .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    public void testPartialMetadata() throws Exception {
+        Map<String, Object> partialMetadata = new HashMap<>();
+        partialMetadata.put("topic", TEST_TOPIC);
+        partialMetadata.put("partition", TEST_PARTITION);
+
+        GenericRecord valueRecord = createDebeziumAvroRecord();
+        CdcSourceRecord cdcSourceRecord =
+                new CdcSourceRecord(TEST_TOPIC, null, valueRecord, 
partialMetadata);
+
+        CdcMetadataConverter[] metadataConverters = 
createKafkaMetadataConverters();
+        DebeziumAvroRecordParser parser =
+                new DebeziumAvroRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        parser.withMetadataConverters(metadataConverters);
+
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        parser.flatMap(
+                cdcSourceRecord,
+                new org.apache.flink.util.Collector<RichCdcMultiplexRecord>() {
+                    @Override
+                    public void collect(RichCdcMultiplexRecord record) {
+                        records.add(record);
+                    }
+
+                    @Override
+                    public void close() {}
+                });
+
+        assertThat(records).hasSize(1);
+        RichCdcMultiplexRecord richRecord = records.get(0);
+
+        Map<String, String> rowData = 
richRecord.toRichCdcRecord().toCdcRecord().data();
+        assertThat(rowData.get("topic")).isEqualTo(TEST_TOPIC);
+        
assertThat(rowData.get("partition")).isEqualTo(String.valueOf(TEST_PARTITION));
+        assertThat(rowData.get("offset")).isNull();
+        assertThat(rowData.get("timestamp")).isNull();
+        assertThat(rowData.get("timestamp_type")).isNull();
+    }
+
+    @Test
+    public void testMetadataWithoutConverters() throws Exception {
+        Map<String, Object> kafkaMetadata = createKafkaMetadata();
+        GenericRecord valueRecord = createDebeziumAvroRecord();
+        CdcSourceRecord cdcSourceRecord =
+                new CdcSourceRecord(TEST_TOPIC, null, valueRecord, 
kafkaMetadata);
+
+        DebeziumAvroRecordParser parser =
+                new DebeziumAvroRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        parser.flatMap(
+                cdcSourceRecord,
+                new org.apache.flink.util.Collector<RichCdcMultiplexRecord>() {
+                    @Override
+                    public void collect(RichCdcMultiplexRecord record) {
+                        records.add(record);
+                    }
+
+                    @Override
+                    public void close() {}
+                });
+
+        assertThat(records).hasSize(1);
+        RichCdcMultiplexRecord richRecord = records.get(0);
+
+        Map<String, String> rowData = 
richRecord.toRichCdcRecord().toCdcRecord().data();
+        assertThat(rowData).doesNotContainKey("topic");
+        assertThat(rowData).doesNotContainKey("partition");
+        assertThat(rowData).doesNotContainKey("offset");
+
+        assertThat(rowData).containsKey("id");
+        assertThat(rowData).containsKey("name");
+    }
+
+    @Test
+    public void testAllMetadataConvertersDataTypes() {
+        KafkaMetadataConverter.TopicConverter topicConverter =
+                new KafkaMetadataConverter.TopicConverter();
+        assertThat(topicConverter.dataType()).isEqualTo(DataTypes.STRING());
+        assertThat(topicConverter.columnName()).isEqualTo("topic");
+
+        KafkaMetadataConverter.PartitionConverter partitionConverter =
+                new KafkaMetadataConverter.PartitionConverter();
+        assertThat(partitionConverter.dataType()).isEqualTo(DataTypes.INT());
+        assertThat(partitionConverter.columnName()).isEqualTo("partition");
+
+        KafkaMetadataConverter.OffsetConverter offsetConverter =
+                new KafkaMetadataConverter.OffsetConverter();
+        assertThat(offsetConverter.dataType()).isEqualTo(DataTypes.BIGINT());
+        assertThat(offsetConverter.columnName()).isEqualTo("offset");
+
+        KafkaMetadataConverter.TimestampConverter timestampConverter =
+                new KafkaMetadataConverter.TimestampConverter();
+        assertThat(timestampConverter.dataType())
+                .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
+        assertThat(timestampConverter.columnName()).isEqualTo("timestamp");
+
+        KafkaMetadataConverter.TimestampTypeConverter timestampTypeConverter =
+                new KafkaMetadataConverter.TimestampTypeConverter();
+        
assertThat(timestampTypeConverter.dataType()).isEqualTo(DataTypes.STRING());
+        
assertThat(timestampTypeConverter.columnName()).isEqualTo("timestamp_type");
+    }
+
+    private Map<String, Object> createKafkaMetadata() {
+        Map<String, Object> metadata = new HashMap<>();
+        metadata.put("topic", TEST_TOPIC);
+        metadata.put("partition", TEST_PARTITION);
+        metadata.put("offset", TEST_OFFSET);
+        metadata.put("timestamp", TEST_TIMESTAMP);
+        metadata.put("timestamp_type", TEST_TIMESTAMP_TYPE);
+        return metadata;
+    }
+
+    private CdcMetadataConverter[] createKafkaMetadataConverters() {
+        return new CdcMetadataConverter[] {
+            new KafkaMetadataConverter.TopicConverter(),
+            new KafkaMetadataConverter.PartitionConverter(),
+            new KafkaMetadataConverter.OffsetConverter(),
+            new KafkaMetadataConverter.TimestampConverter(),
+            new KafkaMetadataConverter.TimestampTypeConverter()
+        };
+    }
+
+    private GenericRecord createDebeziumAvroRecord() {
+        Schema afterSchema =
+                SchemaBuilder.record("after")
+                        .fields()
+                        .name("id")
+                        .type()
+                        .intType()
+                        .noDefault()
+                        .name("name")
+                        .type()
+                        .stringType()
+                        .noDefault()
+                        .endRecord();
+
+        Schema sourceSchema =
+                SchemaBuilder.record("source")
+                        .fields()
+                        .name("db")
+                        .type()
+                        .stringType()
+                        .noDefault()
+                        .name("table")
+                        .type()
+                        .stringType()
+                        .noDefault()
+                        .endRecord();
+
+        Schema envelopeSchema =
+                SchemaBuilder.record("envelope")
+                        .fields()
+                        .name("before")
+                        .type()
+                        .nullable()
+                        .record("before_record")
+                        .fields()
+                        .name("id")
+                        .type()
+                        .intType()
+                        .noDefault()
+                        .name("name")
+                        .type()
+                        .stringType()
+                        .noDefault()
+                        .endRecord()
+                        .noDefault()
+                        .name("after")
+                        .type()
+                        .nullable()
+                        .record("after_record")
+                        .fields()
+                        .name("id")
+                        .type()
+                        .intType()
+                        .noDefault()
+                        .name("name")
+                        .type()
+                        .stringType()
+                        .noDefault()
+                        .endRecord()
+                        .noDefault()
+                        .name("source")
+                        .type(sourceSchema)
+                        .noDefault()
+                        .name("op")
+                        .type()
+                        .stringType()
+                        .noDefault()
+                        .endRecord();
+
+        GenericRecord afterRecord = new GenericData.Record(afterSchema);
+        afterRecord.put("id", 1);
+        afterRecord.put("name", "test_user");
+
+        GenericRecord sourceRecord = new GenericData.Record(sourceSchema);
+        sourceRecord.put("db", "test_db");
+        sourceRecord.put("table", "test_table");
+
+        GenericRecord envelopeRecord = new GenericData.Record(envelopeSchema);
+        envelopeRecord.put("before", null);
+        envelopeRecord.put("after", afterRecord);
+        envelopeRecord.put("source", sourceRecord);
+        envelopeRecord.put("op", "c");
+
+        return envelopeRecord;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
index 02ac86cdab..418979b068 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
@@ -74,6 +74,12 @@ public class KafkaAWSDMSSyncTableActionITCase extends 
KafkaSyncTableActionITCase
         testComputedColumn(AWSDMS);
     }
 
+    @Test
+    @Timeout(60)
+    public void testMetadataColumn() throws Exception {
+        testMetadataColumn(AWSDMS);
+    }
+
     @Test
     @Timeout(60)
     public void testFieldValNullSyncTable() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index eea2b6fa34..df48d9bd4e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -839,6 +839,12 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                 Arrays.asList("_id", "_year"));
     }
 
+    @Test
+    @Timeout(60)
+    public void testMetadataColumn() throws Exception {
+        testMetadataColumn(CANAL);
+    }
+
     @Test
     @Timeout(60)
     public void testTypeMappingToString() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index 4305987536..2ba5bd82b1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -92,6 +92,12 @@ public class KafkaDebeziumSyncTableActionITCase extends 
KafkaSyncTableActionITCa
         testComputedColumn(DEBEZIUM);
     }
 
+    @Test
+    @Timeout(60)
+    public void testMetadataColumn() throws Exception {
+        testMetadataColumn(DEBEZIUM);
+    }
+
     @Test
     @Timeout(60)
     public void testWaterMarkSyncTable() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
index f15c06c5eb..15fe3783e4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -80,6 +80,12 @@ public class KafkaMaxwellSyncTableActionITCase extends 
KafkaSyncTableActionITCas
         testComputedColumn(MAXWELL);
     }
 
+    @Test
+    @Timeout(60)
+    public void testMetadataColumn() throws Exception {
+        testMetadataColumn(MAXWELL);
+    }
+
     @Test
     @Timeout(60)
     public void testWaterMarkSyncTable() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java
new file mode 100644
index 0000000000..cfc63100ba
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link KafkaMetadataConverter}. */
+public class KafkaMetadataConverterTest {
+
+    @Test
+    public void testTopicConverter() {
+        KafkaMetadataConverter.TopicConverter converter =
+                new KafkaMetadataConverter.TopicConverter();
+
+        // Test data type and column name
+        assertThat(converter.dataType()).isEqualTo(DataTypes.STRING());
+        assertThat(converter.columnName()).isEqualTo("topic");
+
+        // Test reading from CdcSourceRecord
+        CdcSourceRecord record = new CdcSourceRecord("test-topic", null, 
"value");
+        assertThat(converter.read(record)).isEqualTo("test-topic");
+
+        // Test with null topic
+        CdcSourceRecord recordWithNullTopic = new CdcSourceRecord(null, null, 
"value");
+        assertThat(converter.read(recordWithNullTopic)).isNull();
+
+        // Test JsonNode method throws exception
+        assertThatThrownBy(
+                        () ->
+                                converter.read(
+                                        
(org.apache.paimon.shade.jackson2.com.fasterxml.jackson
+                                                        .databind.JsonNode)
+                                                null))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining(
+                        "Kafka metadata converters should be used with 
CdcSourceRecord");
+    }
+
+    @Test
+    public void testPartitionConverter() {
+        KafkaMetadataConverter.PartitionConverter converter =
+                new KafkaMetadataConverter.PartitionConverter();
+
+        // Test data type and column name
+        assertThat(converter.dataType()).isEqualTo(DataTypes.INT());
+        assertThat(converter.columnName()).isEqualTo("partition");
+
+        // Test reading from CdcSourceRecord with metadata
+        Map<String, Object> metadata = new HashMap<>();
+        metadata.put("partition", 5);
+        CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", 
metadata);
+        assertThat(converter.read(record)).isEqualTo("5");
+
+        // Test with missing partition metadata
+        CdcSourceRecord recordWithoutPartition = new CdcSourceRecord("topic", 
null, "value");
+        assertThat(converter.read(recordWithoutPartition)).isNull();
+    }
+
+    @Test
+    public void testOffsetConverter() {
+        KafkaMetadataConverter.OffsetConverter converter =
+                new KafkaMetadataConverter.OffsetConverter();
+
+        // Test data type and column name
+        assertThat(converter.dataType()).isEqualTo(DataTypes.BIGINT());
+        assertThat(converter.columnName()).isEqualTo("offset");
+
+        // Test reading from CdcSourceRecord with metadata
+        Map<String, Object> metadata = new HashMap<>();
+        metadata.put("offset", 12345L);
+        CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", 
metadata);
+        assertThat(converter.read(record)).isEqualTo("12345");
+
+        // Test with missing offset metadata
+        CdcSourceRecord recordWithoutOffset = new CdcSourceRecord("topic", 
null, "value");
+        assertThat(converter.read(recordWithoutOffset)).isNull();
+    }
+
+    @Test
+    public void testTimestampConverter() {
+        KafkaMetadataConverter.TimestampConverter converter =
+                new KafkaMetadataConverter.TimestampConverter();
+
+        // Test data type and column name
+        
assertThat(converter.dataType()).isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
+        assertThat(converter.columnName()).isEqualTo("timestamp");
+
+        // Test reading from CdcSourceRecord with metadata
+        Map<String, Object> metadata = new HashMap<>();
+        metadata.put("timestamp", 1640995200000L); // 2022-01-01 00:00:00 UTC
+        CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", 
metadata);
+        String result = converter.read(record);
+        assertThat(result).isNotNull();
+        // Result depends on system timezone, just verify it's a valid 
timestamp string
+        assertThat(result).matches("\\d{4}-\\d{2}-\\d{2} 
\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
+
+        // Test with missing timestamp metadata
+        CdcSourceRecord recordWithoutTimestamp = new CdcSourceRecord("topic", 
null, "value");
+        assertThat(converter.read(recordWithoutTimestamp)).isNull();
+
+        // Test with non-Long timestamp
+        Map<String, Object> invalidMetadata = new HashMap<>();
+        invalidMetadata.put("timestamp", "not-a-long");
+        CdcSourceRecord recordWithInvalidTimestamp =
+                new CdcSourceRecord("topic", null, "value", invalidMetadata);
+        assertThat(converter.read(recordWithInvalidTimestamp)).isNull();
+    }
+
+    @Test
+    public void testTimestampTypeConverter() {
+        KafkaMetadataConverter.TimestampTypeConverter converter =
+                new KafkaMetadataConverter.TimestampTypeConverter();
+
+        // Test data type and column name
+        assertThat(converter.dataType()).isEqualTo(DataTypes.STRING());
+        assertThat(converter.columnName()).isEqualTo("timestamp_type");
+
+        // Test reading from CdcSourceRecord with metadata
+        Map<String, Object> metadata = new HashMap<>();
+        metadata.put("timestamp_type", "CreateTime");
+        CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", 
metadata);
+        assertThat(converter.read(record)).isEqualTo("CreateTime");
+
+        // Test with LogAppendTime
+        metadata.put("timestamp_type", "LogAppendTime");
+        CdcSourceRecord recordLogAppend = new CdcSourceRecord("topic", null, 
"value", metadata);
+        assertThat(converter.read(recordLogAppend)).isEqualTo("LogAppendTime");
+
+        // Test with NoTimestampType
+        metadata.put("timestamp_type", "NoTimestampType");
+        CdcSourceRecord recordNoTimestamp = new CdcSourceRecord("topic", null, 
"value", metadata);
+        
assertThat(converter.read(recordNoTimestamp)).isEqualTo("NoTimestampType");
+
+        // Test with missing timestamp_type metadata
+        CdcSourceRecord recordWithoutTimestampType = new 
CdcSourceRecord("topic", null, "value");
+        assertThat(converter.read(recordWithoutTimestampType)).isNull();
+    }
+
+    @Test
+    public void testAllConvertersWithCompleteMetadata() {
+        // Create a CdcSourceRecord with all Kafka metadata
+        Map<String, Object> metadata = new HashMap<>();
+        metadata.put("topic", "my-topic");
+        metadata.put("partition", 3);
+        metadata.put("offset", 9876L);
+        metadata.put("timestamp", 1640995200000L);
+        metadata.put("timestamp_type", "CreateTime");
+
+        CdcSourceRecord record = new CdcSourceRecord("my-topic", "key", 
"value", metadata);
+
+        // Test all converters
+        KafkaMetadataConverter.TopicConverter topicConverter =
+                new KafkaMetadataConverter.TopicConverter();
+        KafkaMetadataConverter.PartitionConverter partitionConverter =
+                new KafkaMetadataConverter.PartitionConverter();
+        KafkaMetadataConverter.OffsetConverter offsetConverter =
+                new KafkaMetadataConverter.OffsetConverter();
+        KafkaMetadataConverter.TimestampConverter timestampConverter =
+                new KafkaMetadataConverter.TimestampConverter();
+        KafkaMetadataConverter.TimestampTypeConverter timestampTypeConverter =
+                new KafkaMetadataConverter.TimestampTypeConverter();
+
+        assertThat(topicConverter.read(record)).isEqualTo("my-topic");
+        assertThat(partitionConverter.read(record)).isEqualTo("3");
+        assertThat(offsetConverter.read(record)).isEqualTo("9876");
+        assertThat(timestampConverter.read(record)).isNotNull();
+        
assertThat(timestampTypeConverter.read(record)).isEqualTo("CreateTime");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
index 6394b5b719..dc6ed9b9c8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
@@ -80,6 +80,12 @@ public class KafkaOggSyncTableActionITCase extends 
KafkaSyncTableActionITCase {
         testComputedColumn(OGG);
     }
 
+    @Test
+    @Timeout(60)
+    public void testMetadataColumn() throws Exception {
+        testMetadataColumn(OGG);
+    }
+
     @Test
     @Timeout(60)
     public void testCDCOperations() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index f5b6bb5923..596c6d2cc6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -535,6 +535,60 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                 Arrays.asList("_id", "_year"));
     }
 
+    public void testMetadataColumn(String format) throws Exception {
+        String topic = "metadata_column";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, 
"kafka/%s/table/metadatacolumn/%s-data-1.txt", format, format);
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPartitionKeys("_year")
+                        .withPrimaryKeys("_id", "_year")
+                        .withComputedColumnArgs("_year=year(_date)")
+                        .withMetadataColumnPrefix("__kafka_")
+                        .withMetadataColumns(
+                                "topic", "offset", "partition", "timestamp", 
"timestamp_type")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        // Column order matches the order specified in withMetadataColumns 
above
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.BIGINT(),
+                            DataTypes.INT(),
+                            DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
+                            DataTypes.STRING()
+                        },
+                        new String[] {
+                            "_id",
+                            "_date",
+                            "_year",
+                            "__kafka_topic",
+                            "__kafka_offset",
+                            "__kafka_partition",
+                            "__kafka_timestamp",
+                            "__kafka_timestamp_type"
+                        });
+
+        // Use regex matching because offset and timestamp values are 
unpredictable
+        waitForResult(
+                true,
+                Collections.singletonList(
+                        "\\+I\\[101, 2023-03-23, 2023, metadata_column, 0, 0, 
.+, .+\\]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Arrays.asList("_id", "_year"));
+    }
+
     protected void testCDCOperations(String format) throws Exception {
         String topic = "event";
         createTestTopic(topic, 1, 1);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/metadatacolumn/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/metadatacolumn/aws-dms-data-1.txt
new file mode 100644
index 0000000000..cf9112abc4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/metadatacolumn/aws-dms-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"_id":101,"_date":"2023-03-23"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/metadatacolumn/canal-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/metadatacolumn/canal-data-1.txt
new file mode 100644
index 0000000000..311e89dc46
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/metadatacolumn/canal-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"_id":"101","_date":"2023-03-23"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"old":null,"pkNames":["_id"],"sql":"","table":"test_metadata_column","ts":1683006706728,"type":"INSERT"}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/metadatacolumn/debezium-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/metadatacolumn/debezium-data-1.txt
new file mode 100644
index 0000000000..fce341e17d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/metadatacolumn/debezium-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"_id":101,"_date":"2023-03-23"}, "source": 
{"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", 
"ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, 
"table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 
0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, 
"transaction": null}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/metadatacolumn/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/metadatacolumn/maxwell-data-1.txt
new file mode 100644
index 0000000000..d31d26ffb5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/metadatacolumn/maxwell-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"_id":101,"_date":"2023-03-23"},"primary_key_columns":
 ["_id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/metadatacolumn/ogg-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/metadatacolumn/ogg-data-1.txt
new file mode 100644
index 0000000000..7dead51efb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/metadatacolumn/ogg-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000143","primary_keys":["_id"],"after":{"_id":101,"_date":"2023-03-23"},"op_type":"I",
 "current_ts":"2020-05-13T13:39:35.766000", "op_ts":"2020-05-13 
15:40:06.000000"}


Reply via email to