This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e4ec14a958 [Feature][Kafka source] Inject Kafka record timestamp as 
EventTime metadata (#9994)
e4ec14a958 is described below

commit e4ec14a9581358b67482c302ea867252c6839843
Author: Adam Wang <[email protected]>
AuthorDate: Mon Dec 1 14:21:10 2025 +0800

    [Feature][Kafka source] Inject Kafka record timestamp as EventTime metadata 
(#9994)
    
    Co-authored-by: wangxiaogang <[email protected]>
---
 docs/en/connector-v2/source/Kafka.md               |  32 ++-
 docs/en/transform-v2/metadata.md                   |  73 ++++++-
 docs/zh/connector-v2/source/Kafka.md               |  34 +++-
 docs/zh/transform-v2/metadata.md                   |  75 ++++++-
 .../KafkaEventTimeDeserializationSchema.java       | 122 ++++++++++++
 .../seatunnel/kafka/source/KafkaRecordEmitter.java |   4 +
 .../seatunnel/kafka/source/KafkaSourceConfig.java  | 219 +++++++++++++--------
 .../kafka/source/KafkaRecordEmitterTest.java       | 165 ++++++++++++++++
 .../kafka/source/KafkaSourceConfigTest.java        |  11 +-
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     |  30 +++
 ...afka_source_text_with_event_time_to_assert.conf |  80 ++++++++
 ...ompatibleKafkaConnectDeserializationSchema.java |  14 ++
 .../NativeKafkaConnectDeserializationSchema.java   |  13 ++
 13 files changed, 774 insertions(+), 98 deletions(-)

diff --git a/docs/en/connector-v2/source/Kafka.md 
b/docs/en/connector-v2/source/Kafka.md
index fcd0d21f3e..9c0363c89e 100644
--- a/docs/en/connector-v2/source/Kafka.md
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -75,6 +75,36 @@ debezium_record_table_filter {
 
 Only the data of the `test.public.products` table will be consumed.
 
+## Metadata Support
+
+The Kafka source automatically injects `ConsumerRecord.timestamp` into the 
SeaTunnel `EventTime` metadata when the value is non-negative. You can expose 
it as a normal field through the [Metadata 
transform](../../transform-v2/metadata.md) for downstream SQL or partitioning.
+
+```hocon
+source {
+  Kafka {
+    plugin_output = "kafka_raw"
+    topic = "seatunnel_topic"
+    bootstrap.servers = "localhost:9092"
+    format = json
+  }
+}
+
+transform {
+  Metadata {
+    plugin_input = "kafka_raw"
+    plugin_output = "kafka_with_meta"
+    metadata_fields {
+      EventTime = kafka_ts # kafka_ts will contain ConsumerRecord.timestamp 
(ms)
+    }
+  }
+  Sql {
+    plugin_input = "kafka_with_meta"
+    plugin_output = "kafka_enriched"
+    query = "select *, FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd', 
'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+  }
+}
+```
+
 ## Task Example
 
 ### Simple
@@ -430,4 +460,4 @@ Note:key/value is of type byte[].
 
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index be4be03c17..1a6ecf1ad4 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -24,14 +24,15 @@ The Metadata transform plugin is used to extract metadata 
information from data
 | Database  |  string  |  Name of the database containing the data  | All 
connectors |
 |   Table   |  string  |  Name of the table containing the data  | All 
connectors |
 |  RowKind  |  string  |  Row change type, values: +I (insert), -U (update 
before), +U (update after), -D (delete)  | All connectors |
-| EventTime |   long   |  Event timestamp of data change (milliseconds)  | CDC 
connectors |
+| EventTime |   long   |  Event timestamp of data change (milliseconds)  | CDC 
connectors; Kafka source (ConsumerRecord.timestamp) |
 |   Delay   |   long   |  Data collection delay time (milliseconds), i.e., the 
difference between data extraction time and database change time  | CDC 
connectors |
 | Partition |  string  |  Partition information of the data, multiple 
partition fields separated by commas  | Connectors supporting partitions |
 
 ### Important Notes
 
 1. **Metadata field names are case-sensitive**: Configuration must strictly 
follow the Key names in the table above (e.g., `Database`, `Table`, `RowKind`, 
etc.)
-2. **CDC-specific fields**: `EventTime` and `Delay` are only valid when using 
CDC connectors (except TiDB-CDC)
+2. **Time fields**: `Delay` is only valid when using CDC connectors (except 
TiDB-CDC). `EventTime` is provided by CDC connectors and also by the Kafka 
source via `ConsumerRecord.timestamp` when available.
+3. **Kafka event time**: The Kafka source writes `ConsumerRecord.timestamp` 
(milliseconds) into `EventTime` when it is non-negative, so you can surface it 
with the `Metadata` transform.
 
 ## Options
 
@@ -171,3 +172,71 @@ sink {
   }
 }
 ```
+
+### Example 3: Kafka record time for partitioning
+
+Expose Kafka `ConsumerRecord.timestamp` (injected into `EventTime`) as 
`kafka_ts`, convert it to a partition field, and write to Hive. This pattern is 
useful when replaying Kafka data and aligning partitions by the original record 
time.
+
+```hocon
+env {
+  execution.parallelism = 4
+  job.mode = "STREAMING"
+  checkpoint.interval = 60000
+}
+
+source {
+  Kafka {
+    plugin_output = "kafka_raw"
+    schema = {
+      fields {
+        id = bigint
+        customer_type = string
+        data = string
+      }
+    }
+    format = text
+    field_delimiter = "|"
+    topic = "push_report_event"
+    bootstrap.servers = "kafka-broker-1:9092,kafka-broker-2:9092"
+    consumer.group = "seatunnel_event_backfill"
+    kafka.config = {
+      max.poll.records = 100
+      auto.offset.reset = "earliest"
+      enable.auto.commit = "false"
+    }
+  }
+}
+
+transform {
+  Metadata {
+    plugin_input = "kafka_raw"
+    plugin_output = "kafka_with_meta"
+    metadata_fields = {
+      EventTime = "kafka_ts"
+    }
+  }
+
+  Sql {
+    plugin_input = "kafka_with_meta"
+    plugin_output = "source_table"
+    query = "select id, customer_type, data, FROM_UNIXTIME(kafka_ts/1000, 
'yyyy-MM-dd', 'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+  }
+}
+
+sink {
+  Hive {
+    table_name = "example_db.ods_sys_event_report"
+    metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+    hdfs_site_path = "/path/to/hdfs-site.xml"
+    hive_site_path = "/path/to/hive-site.xml"
+    krb5_path = "/path/to/krb5.conf"
+    kerberos_principal = "hive/[email protected]"
+    kerberos_keytab_path = "/path/to/hive.keytab"
+    overwrite = false
+    plugin_input = "source_table"
+    # compress_codec = "SNAPPY"
+  }
+}
+```
+
+Here `pt` is derived from the Kafka event time and can be used as a Hive 
partition column.
diff --git a/docs/zh/connector-v2/source/Kafka.md 
b/docs/zh/connector-v2/source/Kafka.md
index 643b584cc3..0f16ef3cf4 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -69,12 +69,42 @@ import ChangeLog from '../changelog/connector-kafka.md';
 debezium_record_table_filter {
   database_name = "test"
   schema_name = "public" // null 如果不存在
-  table_name = "products"
+ table_name = "products"
 }
 ```
 
 只有 `test.public.products` 表的数据将被消费。
 
+## 元数据支持
+
+Kafka 源会在 `ConsumerRecord.timestamp` 大于等于 0 时,将其自动写入 SeaTunnel 行的 `EventTime` 
元数据。可以借助 [Metadata 转换](../../transform-v2/metadata.md) 把这段时间戳暴露为普通字段,方便做分区或下游 
SQL 处理。
+
+```hocon
+source {
+  Kafka {
+    plugin_output = "kafka_raw"
+    topic = "seatunnel_topic"
+    bootstrap.servers = "localhost:9092"
+    format = json
+  }
+}
+
+transform {
+  Metadata {
+    plugin_input = "kafka_raw"
+    plugin_output = "kafka_with_meta"
+    metadata_fields {
+      EventTime = kafka_ts # ConsumerRecord.timestamp (ms)
+    }
+  }
+  Sql {
+    plugin_input = "kafka_with_meta"
+    plugin_output = "kafka_enriched"
+    query = "select *, FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd', 
'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+  }
+}
+```
+
 ## 任务示例
 
 ### 简单示例
@@ -423,4 +453,4 @@ source {
 
 ## 变更日志
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index 4a714901a7..1567fd20ee 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -24,14 +24,15 @@ Metadata 转换插件用于将数据行中的元数据信息提取并转换为
 | Database  |  string  |  数据所属的数据库名称  | 所有连接器 |
 |   Table   |  string  |  数据所属的表名称  | 所有连接器 |
 |  RowKind  |  string  |  行的变更类型,值为:+I(插入)、-U(更新前)、+U(更新后)、-D(删除)  | 所有连接器 |
-| EventTime |   long   |  数据变更的事件时间戳(毫秒)  | CDC 连接器 |
+| EventTime | long   | 数据变更的事件时间戳(毫秒) | CDC 连接器;Kafka 
源(ConsumerRecord.timestamp) |
 |   Delay   |   long   |  数据采集延迟时间(毫秒),即数据抽取时间与数据库变更时间的差值  | CDC 连接器 |
 | Partition |  string  |  数据所属的分区信息,多个分区字段使用逗号分隔  | 支持分区的连接器 |
 
 ### 重要说明
 
-1. **元数据字段区分大小写**:配置时必须严格按照上表中的 Key 名称(如 `Database`、`Table`、`RowKind` 等)
-2. **CDC 专有字段**:`EventTime` 和 `Delay` 仅在使用 CDC 连接器时有效(TiDB-CDC 除外)
+1. **元数据字段区分大小写**:配置时必须严格按照上表中的 Key 名称(如 `Database`、`Table`、`RowKind` 等)。
+2. **时间相关字段**:`Delay` 仅在 CDC 连接器有效(TiDB-CDC 除外);`EventTime` 由 CDC 连接器写入,也会在 
Kafka 源中使用 `ConsumerRecord.timestamp`(毫秒,非负时)写入。
+3. **Kafka 事件时间**:Kafka 源会在 `ConsumerRecord.timestamp` 非负时写入 `EventTime`,可通过 
Metadata 转换将其暴露为普通字段。
 
 ## 配置选项
 
@@ -171,3 +172,71 @@ sink {
   }
 }
 ```
+
+### 示例 3:Kafka 写入时间用于分区
+
+将 Kafka `ConsumerRecord.timestamp`(写入到 `EventTime` 元数据)暴露为普通字段,再生成分区字段并写入 
Hive,适合回放或补数场景。
+
+```hocon
+env {
+  execution.parallelism = 4
+  job.mode = "STREAMING"
+  checkpoint.interval = 60000
+}
+
+source {
+  Kafka {
+    plugin_output = "kafka_raw"
+    schema = {
+      fields {
+        id = bigint
+        customer_type = string
+        data = string
+      }
+    }
+    format = text
+    field_delimiter = "|"
+    topic = "push_report_event"
+    bootstrap.servers = "kafka-broker-1:9092,kafka-broker-2:9092"
+    consumer.group = "seatunnel_event_backfill"
+    kafka.config = {
+      max.poll.records = 100
+      auto.offset.reset = "earliest"
+      enable.auto.commit = "false"
+    }
+  }
+}
+
+transform {
+  Metadata {
+    plugin_input = "kafka_raw"
+    plugin_output = "kafka_with_meta"
+    metadata_fields = {
+      EventTime = "kafka_ts"
+    }
+  }
+
+  Sql {
+    plugin_input = "kafka_with_meta"
+    plugin_output = "source_table"
+    query = "select id, customer_type, data, FROM_UNIXTIME(kafka_ts/1000, 
'yyyy-MM-dd', 'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+  }
+}
+
+sink {
+  Hive {
+    table_name = "example_db.ods_sys_event_report"
+    metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+    hdfs_site_path = "/path/to/hdfs-site.xml"
+    hive_site_path = "/path/to/hive-site.xml"
+    krb5_path = "/path/to/krb5.conf"
+    kerberos_principal = "hive/[email protected]"
+    kerberos_keytab_path = "/path/to/hive.keytab"
+    overwrite = false
+    plugin_input = "source_table"
+    # compress_codec = "SNAPPY"
+  }
+}
+```
+
+上面的 `pt` 字段由 Kafka 事件时间转换而来,可在 Hive 中作为分区列使用,便于补数和校准分区。
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaEventTimeDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaEventTimeDeserializationSchema.java
new file mode 100644
index 0000000000..9916f02748
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaEventTimeDeserializationSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.IOException;
+
+/**
+ * A {@link DeserializationSchema} wrapper that attaches Kafka record 
timestamp as {@code
+ * CommonOptions.EVENT_TIME} metadata to emitted {@link SeaTunnelRow}s.
+ *
+ * <p>The timestamp for the current record is provided via {@link 
#setCurrentRecordTimestamp(Long)}
+ * before deserialization is invoked.
+ */
+public class KafkaEventTimeDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
+
+    private final DeserializationSchema<SeaTunnelRow> delegate;
+
+    private Long currentRecordTimestamp;
+
+    public 
KafkaEventTimeDeserializationSchema(DeserializationSchema<SeaTunnelRow> 
delegate) {
+        this.delegate = delegate;
+    }
+
+    public DeserializationSchema<SeaTunnelRow> getDelegate() {
+        return delegate;
+    }
+
+    public void setCurrentRecordTimestamp(Long timestamp) {
+        this.currentRecordTimestamp = timestamp;
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        SeaTunnelRow row = delegate.deserialize(message);
+        if (row == null) {
+            return null;
+        }
+        attachEventTime(row);
+        return row;
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) 
throws IOException {
+        delegate.deserialize(
+                message,
+                new Collector<SeaTunnelRow>() {
+                    @Override
+                    public void collect(SeaTunnelRow record) {
+                        attachEventTime(record);
+                        out.collect(record);
+                    }
+
+                    @Override
+                    public void markSchemaChangeBeforeCheckpoint() {
+                        out.markSchemaChangeBeforeCheckpoint();
+                    }
+
+                    @Override
+                    public void collect(
+                            
org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent event) {
+                        out.collect(event);
+                    }
+
+                    @Override
+                    public void markSchemaChangeAfterCheckpoint() {
+                        out.markSchemaChangeAfterCheckpoint();
+                    }
+
+                    @Override
+                    public Object getCheckpointLock() {
+                        return out.getCheckpointLock();
+                    }
+
+                    @Override
+                    public boolean isEmptyThisPollNext() {
+                        return out.isEmptyThisPollNext();
+                    }
+
+                    @Override
+                    public void resetEmptyThisPollNext() {
+                        out.resetEmptyThisPollNext();
+                    }
+                });
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return delegate.getProducedType();
+    }
+
+    private void attachEventTime(SeaTunnelRow row) {
+        if (row == null || currentRecordTimestamp == null || 
currentRecordTimestamp < 0) {
+            return;
+        }
+        Object existing = 
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+        if (existing == null) {
+            MetadataUtil.setEventTime(row, currentRecordTimestamp);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
index 1e3811ba49..66810cf33c 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
@@ -61,6 +61,10 @@ public class KafkaRecordEmitter
         // todo there is an additional loss in this place for non-multi-table 
scenarios
         DeserializationSchema<SeaTunnelRow> deserializationSchema =
                 
mapMetadata.get(splitState.getTablePath()).getDeserializationSchema();
+        if (deserializationSchema instanceof 
KafkaEventTimeDeserializationSchema) {
+            ((KafkaEventTimeDeserializationSchema) deserializationSchema)
+                    .setCurrentRecordTimestamp(consumerRecord.timestamp());
+        }
         try {
             if (deserializationSchema instanceof 
CompatibleKafkaConnectDeserializationSchema) {
                 ((CompatibleKafkaConnectDeserializationSchema) 
deserializationSchema)
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 97e5451ce5..ce2c38cbb0 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -25,6 +25,8 @@ import 
org.apache.seatunnel.api.options.table.TableIdentifierOptions;
 import org.apache.seatunnel.api.options.table.TableSchemaOptions;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -266,20 +268,38 @@ public class KafkaSourceConfig implements Serializable {
         }
         TablePath tablePath = getTablePathFromSchema(readonlyConfig, 
readonlyConfig.get(TOPIC));
 
-        return CatalogTable.of(
-                TableIdentifier.of("", tablePath),
-                tableSchema,
-                new HashMap<String, String>() {
-                    {
-                        
Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME))
-                                .ifPresent(value -> 
put(PROTOBUF_MESSAGE_NAME.key(), value));
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("", tablePath),
+                        tableSchema,
+                        new HashMap<String, String>() {
+                            {
+                                
Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME))
+                                        .ifPresent(
+                                                value -> 
put(PROTOBUF_MESSAGE_NAME.key(), value));
 
-                        
Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA))
-                                .ifPresent(value -> put(PROTOBUF_SCHEMA.key(), 
value));
-                    }
-                },
-                Collections.emptyList(),
-                null);
+                                
Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA))
+                                        .ifPresent(value -> 
put(PROTOBUF_SCHEMA.key(), value));
+                            }
+                        },
+                        Collections.emptyList(),
+                        null);
+
+        // Expose Kafka record timestamp as metadata 'EventTime' for Metadata 
transform
+        MetadataSchema metadataSchema =
+                MetadataSchema.builder()
+                        .column(
+                                MetadataColumn.of(
+                                        
org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME
+                                                .getName(),
+                                        BasicType.LONG_TYPE,
+                                        0L,
+                                        true,
+                                        null,
+                                        null))
+                        .build();
+
+        return CatalogTable.withMetadata(catalogTable, metadataSchema);
     }
 
     private TablePath getTablePathFromSchema(ReadonlyConfig readonlyConfig, 
String topicName) {
@@ -299,85 +319,110 @@ public class KafkaSourceConfig implements Serializable {
         SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
         MessageFormat format = readonlyConfig.get(FORMAT);
 
+        DeserializationSchema<SeaTunnelRow> schema;
+
         if (format == MessageFormat.NATIVE) {
-            return new NativeKafkaConnectDeserializationSchema(
-                    catalogTable, false, false, false, false);
+            schema =
+                    new NativeKafkaConnectDeserializationSchema(
+                            catalogTable, false, false, false, false);
+        } else if 
(!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
+            schema =
+                    TextDeserializationSchema.builder()
+                            .seaTunnelRowType(seaTunnelRowType)
+                            .delimiter(TextFormatConstant.PLACEHOLDER)
+                            .setCatalogTable(catalogTable)
+                            .build();
+        } else {
+            switch (format) {
+                case JSON:
+                    schema = new JsonDeserializationSchema(catalogTable, 
false, false);
+                    break;
+                case TEXT:
+                    String delimiter = readonlyConfig.get(FIELD_DELIMITER);
+                    schema =
+                            TextDeserializationSchema.builder()
+                                    .seaTunnelRowType(seaTunnelRowType)
+                                    .delimiter(delimiter)
+                                    .build();
+                    break;
+                case CANAL_JSON:
+                    schema =
+                            
CanalJsonDeserializationSchema.builder(catalogTable)
+                                    .setIgnoreParseErrors(true)
+                                    .build();
+                    break;
+                case OGG_JSON:
+                    schema =
+                            OggJsonDeserializationSchema.builder(catalogTable)
+                                    .setIgnoreParseErrors(true)
+                                    .build();
+                    break;
+                case MAXWELL_JSON:
+                    schema =
+                            
MaxWellJsonDeserializationSchema.builder(catalogTable)
+                                    .setIgnoreParseErrors(true)
+                                    .build();
+                    break;
+                case COMPATIBLE_KAFKA_CONNECT_JSON:
+                    Boolean keySchemaEnable =
+                            readonlyConfig.get(
+                                    
KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
+                    Boolean valueSchemaEnable =
+                            readonlyConfig.get(
+                                    
KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
+                    schema =
+                            new CompatibleKafkaConnectDeserializationSchema(
+                                    catalogTable, keySchemaEnable, 
valueSchemaEnable, false, false);
+                    break;
+                case DEBEZIUM_JSON:
+                    boolean includeSchema = 
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
+                    TableIdentifierConfig tableFilter =
+                            readonlyConfig.get(DEBEZIUM_RECORD_TABLE_FILTER);
+                    if (tableFilter != null) {
+                        TablePath tablePath =
+                                TablePath.of(
+                                        
StringUtils.isNotEmpty(tableFilter.getDatabaseName())
+                                                ? tableFilter.getDatabaseName()
+                                                : null,
+                                        
StringUtils.isNotEmpty(tableFilter.getSchemaName())
+                                                ? tableFilter.getSchemaName()
+                                                : null,
+                                        
StringUtils.isNotEmpty(tableFilter.getTableName())
+                                                ? tableFilter.getTableName()
+                                                : null);
+                        Map<TablePath, DebeziumJsonDeserializationSchema> 
tableDeserializationMap =
+                                Collections.singletonMap(
+                                        tablePath,
+                                        new DebeziumJsonDeserializationSchema(
+                                                catalogTable, true, 
includeSchema));
+                        schema =
+                                new 
DebeziumJsonDeserializationSchemaDispatcher(
+                                        tableDeserializationMap, true, 
includeSchema);
+                    } else {
+                        schema =
+                                new DebeziumJsonDeserializationSchema(
+                                        catalogTable, true, includeSchema);
+                    }
+                    break;
+                case AVRO:
+                    schema = new AvroDeserializationSchema(catalogTable);
+                    break;
+                case PROTOBUF:
+                    schema = new ProtobufDeserializationSchema(catalogTable);
+                    break;
+                default:
+                    throw new SeaTunnelJsonFormatException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                            "Unsupported format: " + format);
+            }
         }
 
-        if 
(!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
-            return TextDeserializationSchema.builder()
-                    .seaTunnelRowType(seaTunnelRowType)
-                    .delimiter(TextFormatConstant.PLACEHOLDER)
-                    .setCatalogTable(catalogTable)
-                    .build();
+        if (schema instanceof NativeKafkaConnectDeserializationSchema
+                || schema instanceof 
CompatibleKafkaConnectDeserializationSchema) {
+            return schema;
         }
 
-        switch (format) {
-            case JSON:
-                return new JsonDeserializationSchema(catalogTable, false, 
false);
-            case TEXT:
-                String delimiter = readonlyConfig.get(FIELD_DELIMITER);
-                return TextDeserializationSchema.builder()
-                        .seaTunnelRowType(seaTunnelRowType)
-                        .delimiter(delimiter)
-                        .build();
-            case CANAL_JSON:
-                return CanalJsonDeserializationSchema.builder(catalogTable)
-                        .setIgnoreParseErrors(true)
-                        .build();
-            case OGG_JSON:
-                return OggJsonDeserializationSchema.builder(catalogTable)
-                        .setIgnoreParseErrors(true)
-                        .build();
-            case MAXWELL_JSON:
-                return MaxWellJsonDeserializationSchema.builder(catalogTable)
-                        .setIgnoreParseErrors(true)
-                        .build();
-
-            case COMPATIBLE_KAFKA_CONNECT_JSON:
-                Boolean keySchemaEnable =
-                        readonlyConfig.get(
-                                
KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
-                Boolean valueSchemaEnable =
-                        readonlyConfig.get(
-                                
KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
-                return new CompatibleKafkaConnectDeserializationSchema(
-                        catalogTable, keySchemaEnable, valueSchemaEnable, 
false, false);
-            case DEBEZIUM_JSON:
-                boolean includeSchema = 
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
-                TableIdentifierConfig tableFilter =
-                        readonlyConfig.get(DEBEZIUM_RECORD_TABLE_FILTER);
-                if (tableFilter != null) {
-                    TablePath tablePath =
-                            TablePath.of(
-                                    
StringUtils.isNotEmpty(tableFilter.getDatabaseName())
-                                            ? tableFilter.getDatabaseName()
-                                            : null,
-                                    
StringUtils.isNotEmpty(tableFilter.getSchemaName())
-                                            ? tableFilter.getSchemaName()
-                                            : null,
-                                    
StringUtils.isNotEmpty(tableFilter.getTableName())
-                                            ? tableFilter.getTableName()
-                                            : null);
-                    Map<TablePath, DebeziumJsonDeserializationSchema> 
tableDeserializationMap =
-                            Collections.singletonMap(
-                                    tablePath,
-                                    new DebeziumJsonDeserializationSchema(
-                                            catalogTable, true, 
includeSchema));
-                    return new DebeziumJsonDeserializationSchemaDispatcher(
-                            tableDeserializationMap, true, includeSchema);
-                } else {
-                    return new DebeziumJsonDeserializationSchema(catalogTable, 
true, includeSchema);
-                }
-            case AVRO:
-                return new AvroDeserializationSchema(catalogTable);
-            case PROTOBUF:
-                return new ProtobufDeserializationSchema(catalogTable);
-            default:
-                throw new SeaTunnelJsonFormatException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                        "Unsupported format: " + format);
-        }
+        return new KafkaEventTimeDeserializationSchema(schema);
     }
 
     private TableSchema nativeTableSchema() {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java
new file mode 100644
index 0000000000..14fd630304
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class KafkaRecordEmitterTest {
+
+    @Test
+    void emitRecordShouldAttachKafkaTimestampAsEventTime() throws Exception {
+        long kafkaTimestamp = 1690000000000L;
+
+        // Prepare a simple deserialization schema that creates a single-field 
row from bytes
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"f0"}, new SeaTunnelDataType[] 
{BasicType.STRING_TYPE});
+        DeserializationSchema<SeaTunnelRow> schema =
+                new KafkaEventTimeDeserializationSchema(new 
SimpleStringRowSchema(rowType));
+
+        // Build ConsumerMetadata map for the table
+        ConsumerMetadata metadata = new ConsumerMetadata();
+        metadata.setDeserializationSchema(schema);
+        Map<TablePath, ConsumerMetadata> map = new HashMap<>();
+        TablePath tablePath = TablePath.DEFAULT;
+        map.put(tablePath, metadata);
+
+        KafkaRecordEmitter emitter = new KafkaRecordEmitter(map, 
MessageFormatErrorHandleWay.FAIL);
+
+        // Mock ConsumerRecord<byte[], byte[]>
+        org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]> 
record =
+                
Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class);
+        Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp);
+        
Mockito.when(record.value()).thenReturn("hello".getBytes(StandardCharsets.UTF_8));
+        Mockito.when(record.offset()).thenReturn(100L);
+
+        // Prepare split state
+        KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new 
TopicPartition("t", 0));
+        KafkaSourceSplitState splitState = new KafkaSourceSplitState(split);
+
+        // Capture outputs
+        List<SeaTunnelRow> out = new ArrayList<>();
+        Collector<SeaTunnelRow> collector = new TestCollector(out);
+
+        emitter.emitRecord(record, collector, splitState);
+
+        Assertions.assertEquals(1, out.size());
+        SeaTunnelRow row = out.get(0);
+        Object eventTime = 
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+        Assertions.assertEquals(kafkaTimestamp, eventTime);
+
+        // Also verify split state offset advanced
+        Assertions.assertEquals(101L, splitState.getCurrentOffset());
+    }
+
+    @Test
+    void emitRecordShouldNotAttachEventTimeWhenTimestampNegative() throws 
Exception {
+        long kafkaTimestamp = -1L; // invalid timestamp
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"f0"}, new SeaTunnelDataType[] 
{BasicType.STRING_TYPE});
+        DeserializationSchema<SeaTunnelRow> schema =
+                new KafkaEventTimeDeserializationSchema(new 
SimpleStringRowSchema(rowType));
+
+        ConsumerMetadata metadata = new ConsumerMetadata();
+        metadata.setDeserializationSchema(schema);
+        Map<TablePath, ConsumerMetadata> map = new HashMap<>();
+        TablePath tablePath = TablePath.DEFAULT;
+        map.put(tablePath, metadata);
+
+        KafkaRecordEmitter emitter = new KafkaRecordEmitter(map, 
MessageFormatErrorHandleWay.FAIL);
+
+        org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]> 
record =
+                
Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class);
+        Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp);
+        
Mockito.when(record.value()).thenReturn("world".getBytes(StandardCharsets.UTF_8));
+        Mockito.when(record.offset()).thenReturn(5L);
+
+        KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new 
TopicPartition("t2", 1));
+        KafkaSourceSplitState splitState = new KafkaSourceSplitState(split);
+
+        List<SeaTunnelRow> out = new ArrayList<>();
+        Collector<SeaTunnelRow> collector = new TestCollector(out);
+
+        emitter.emitRecord(record, collector, splitState);
+
+        Assertions.assertEquals(1, out.size());
+        SeaTunnelRow row = out.get(0);
+        
Assertions.assertFalse(row.getOptions().containsKey(CommonOptions.EVENT_TIME.getName()));
+        Assertions.assertEquals(6L, splitState.getCurrentOffset());
+    }
+
+    private static class SimpleStringRowSchema implements 
DeserializationSchema<SeaTunnelRow> {
+        private final SeaTunnelRowType producedType;
+
+        private SimpleStringRowSchema(SeaTunnelRowType producedType) {
+            this.producedType = producedType;
+        }
+
+        @Override
+        public SeaTunnelRow deserialize(byte[] message) throws IOException {
+            String v = new String(message, StandardCharsets.UTF_8);
+            return new SeaTunnelRow(new Object[] {v});
+        }
+
+        @Override
+        public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+            return producedType;
+        }
+    }
+
+    private static class TestCollector implements Collector<SeaTunnelRow> {
+        private final List<SeaTunnelRow> out;
+
+        private TestCollector(List<SeaTunnelRow> out) {
+            this.out = out;
+        }
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            out.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return this;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
index adde5bc861..e550c99590 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
@@ -65,10 +65,15 @@ public class KafkaSourceConfigTest {
 
         DeserializationSchema<SeaTunnelRow> deserializationSchema =
                 
sourceConfig.getMapMetadata().get(TablePath.of("test")).getDeserializationSchema();
-        Assertions.assertTrue(
-                deserializationSchema instanceof 
DebeziumJsonDeserializationSchemaDispatcher);
+
+        Assertions.assertTrue(deserializationSchema instanceof 
KafkaEventTimeDeserializationSchema);
+
+        DeserializationSchema<SeaTunnelRow> innerSchema =
+                ((KafkaEventTimeDeserializationSchema) 
deserializationSchema).getDelegate();
+
+        Assertions.assertTrue(innerSchema instanceof 
DebeziumJsonDeserializationSchemaDispatcher);
         Assertions.assertNotNull(
-                ((DebeziumJsonDeserializationSchemaDispatcher) 
deserializationSchema)
+                ((DebeziumJsonDeserializationSchemaDispatcher) innerSchema)
                         .getTableDeserializationMap()
                         .get(TablePath.of("test.test.test")));
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 124ed7ff17..38c9184139 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -469,6 +469,36 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(1, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK},
+            disabledReason =
+                    "The implementation of the Spark engine does not currently 
support metadata.")
+    public void testSourceKafkaTextEventTimeToAssert(TestContainer container)
+            throws IOException, InterruptedException {
+        long fixedTimestamp = 1738395840000L;
+        TextSerializationSchema serializer =
+                TextSerializationSchema.builder()
+                        .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                        .delimiter(",")
+                        .build();
+        generateTestData(
+                row ->
+                        new ProducerRecord<>(
+                                "test_topic_text_eventtime",
+                                null,
+                                fixedTimestamp,
+                                null,
+                                serializer.serialize(row)),
+                0,
+                10);
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/textFormatIT/kafka_source_text_with_event_time_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testSourceKafka(TestContainer container) throws IOException, 
InterruptedException {
         testKafkaLatestToConsole(container);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf
new file mode 100644
index 0000000000..6ccd055c1f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_text_eventtime"
+    plugin_output = "kafka_table"
+    start_mode = "earliest"
+    format_error_handle_way = fail
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+    format = text
+    field_delimiter = ","
+  }
+}
+
+transform {
+  Metadata {
+    plugin_input = "kafka_table"
+    plugin_output = "kafka_table_with_meta"
+    metadata_fields = {
+      EventTime = event_time_ms
+    }
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "kafka_table_with_meta"
+    rules = {
+      field_rules = [
+        {
+          field_name = event_time_ms
+          field_type = bigint
+          field_value = [
+            { rule_type = NOT_NULL }
+          ]
+        }
+      ]
+    }
+  }
+}
+
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
index 1d0974668b..0a9e8867ea 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
@@ -21,6 +21,8 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -122,6 +124,7 @@ public class CompatibleKafkaConnectDeserializationSchema
             for (int i = 0; i < arrayNode.size(); i++) {
                 SeaTunnelRow row = convertJsonNode(arrayNode.get(i));
                 row.setRowKind(rowKind);
+                attachEventTime(row, msg.timestamp());
                 if (tablePath.isPresent()) {
                     row.setTableId(tablePath.toString());
                 }
@@ -130,6 +133,7 @@ public class CompatibleKafkaConnectDeserializationSchema
         } else {
             SeaTunnelRow row = convertJsonNode(payload);
             row.setRowKind(rowKind);
+            attachEventTime(row, msg.timestamp());
             if (tablePath.isPresent()) {
                 row.setTableId(tablePath.toString());
             }
@@ -176,6 +180,16 @@ public class CompatibleKafkaConnectDeserializationSchema
         return seaTunnelRowType;
     }
 
+    private void attachEventTime(SeaTunnelRow row, long timestamp) {
+        if (row == null || timestamp < 0) {
+            return;
+        }
+        Object existing = 
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+        if (existing == null) {
+            MetadataUtil.setEventTime(row, timestamp);
+        }
+    }
+
     private void tryInitConverter() {
         if (keyConverter == null) {
             synchronized (this) {
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
index d4aca3ff4b..e6fa90100e 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
@@ -21,6 +21,8 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -112,6 +114,7 @@ public class NativeKafkaConnectDeserializationSchema
 
         SeaTunnelRow row = convertJsonNode(record);
         row.setRowKind(rowKind);
+        attachEventTime(row, msg.timestamp());
         if (tablePath.isPresent()) {
             row.setTableId(tablePath.toString());
         }
@@ -199,4 +202,14 @@ public class NativeKafkaConnectDeserializationSchema
             }
         }
     }
+
+    private void attachEventTime(SeaTunnelRow row, long timestamp) {
+        if (row == null || timestamp < 0) {
+            return;
+        }
+        Object existing = 
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+        if (existing == null) {
+            MetadataUtil.setEventTime(row, timestamp);
+        }
+    }
 }

Reply via email to