This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e4ec14a958 [Feature][Kafka source] Inject Kafka record timestamp as
EventTime metadata (#9994)
e4ec14a958 is described below
commit e4ec14a9581358b67482c302ea867252c6839843
Author: Adam Wang <[email protected]>
AuthorDate: Mon Dec 1 14:21:10 2025 +0800
[Feature][Kafka source] Inject Kafka record timestamp as EventTime metadata
(#9994)
Co-authored-by: wangxiaogang <[email protected]>
---
docs/en/connector-v2/source/Kafka.md | 32 ++-
docs/en/transform-v2/metadata.md | 73 ++++++-
docs/zh/connector-v2/source/Kafka.md | 34 +++-
docs/zh/transform-v2/metadata.md | 75 ++++++-
.../KafkaEventTimeDeserializationSchema.java | 122 ++++++++++++
.../seatunnel/kafka/source/KafkaRecordEmitter.java | 4 +
.../seatunnel/kafka/source/KafkaSourceConfig.java | 219 +++++++++++++--------
.../kafka/source/KafkaRecordEmitterTest.java | 165 ++++++++++++++++
.../kafka/source/KafkaSourceConfigTest.java | 11 +-
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 30 +++
...afka_source_text_with_event_time_to_assert.conf | 80 ++++++++
...ompatibleKafkaConnectDeserializationSchema.java | 14 ++
.../NativeKafkaConnectDeserializationSchema.java | 13 ++
13 files changed, 774 insertions(+), 98 deletions(-)
diff --git a/docs/en/connector-v2/source/Kafka.md
b/docs/en/connector-v2/source/Kafka.md
index fcd0d21f3e..9c0363c89e 100644
--- a/docs/en/connector-v2/source/Kafka.md
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -75,6 +75,36 @@ debezium_record_table_filter {
Only the data of the `test.public.products` table will be consumed.
+## Metadata Support
+
+The Kafka source automatically injects `ConsumerRecord.timestamp` into the
SeaTunnel `EventTime` metadata when the value is non-negative. You can expose
it as a normal field through the [Metadata
transform](../../transform-v2/metadata.md) for downstream SQL or partitioning.
+
+```hocon
+source {
+ Kafka {
+ plugin_output = "kafka_raw"
+ topic = "seatunnel_topic"
+ bootstrap.servers = "localhost:9092"
+ format = json
+ }
+}
+
+transform {
+ Metadata {
+ plugin_input = "kafka_raw"
+ plugin_output = "kafka_with_meta"
+ metadata_fields {
+ EventTime = kafka_ts # kafka_ts will contain ConsumerRecord.timestamp
(ms)
+ }
+ }
+ Sql {
+ plugin_input = "kafka_with_meta"
+ plugin_output = "kafka_enriched"
+ query = "select *, FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd',
'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+ }
+}
+```
+
## Task Example
### Simple
@@ -430,4 +460,4 @@ Note:key/value is of type byte[].
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index be4be03c17..1a6ecf1ad4 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -24,14 +24,15 @@ The Metadata transform plugin is used to extract metadata
information from data
| Database | string | Name of the database containing the data | All
connectors |
| Table | string | Name of the table containing the data | All
connectors |
| RowKind | string | Row change type, values: +I (insert), -U (update
before), +U (update after), -D (delete) | All connectors |
-| EventTime | long | Event timestamp of data change (milliseconds) | CDC
connectors |
+| EventTime | long | Event timestamp of data change (milliseconds) | CDC
connectors; Kafka source (ConsumerRecord.timestamp) |
| Delay | long | Data collection delay time (milliseconds), i.e., the
difference between data extraction time and database change time | CDC
connectors |
| Partition | string | Partition information of the data, multiple
partition fields separated by commas | Connectors supporting partitions |
### Important Notes
1. **Metadata field names are case-sensitive**: Configuration must strictly
follow the Key names in the table above (e.g., `Database`, `Table`, `RowKind`,
etc.)
-2. **CDC-specific fields**: `EventTime` and `Delay` are only valid when using
CDC connectors (except TiDB-CDC)
+2. **Time fields**: `Delay` is only valid when using CDC connectors (except
TiDB-CDC). `EventTime` is provided by CDC connectors and also by the Kafka
source via `ConsumerRecord.timestamp` when available.
+3. **Kafka event time**: The Kafka source writes `ConsumerRecord.timestamp`
(milliseconds) into `EventTime` when it is non-negative, so you can surface it
with the `Metadata` transform.
## Options
@@ -171,3 +172,71 @@ sink {
}
}
```
+
+### Example 3: Kafka record time for partitioning
+
+Expose Kafka `ConsumerRecord.timestamp` (injected into `EventTime`) as
`kafka_ts`, convert it to a partition field, and write to Hive. This pattern is
useful when replaying Kafka data and aligning partitions by the original record
time.
+
+```hocon
+env {
+ execution.parallelism = 4
+ job.mode = "STREAMING"
+ checkpoint.interval = 60000
+}
+
+source {
+ Kafka {
+ plugin_output = "kafka_raw"
+ schema = {
+ fields {
+ id = bigint
+ customer_type = string
+ data = string
+ }
+ }
+ format = text
+ field_delimiter = "|"
+ topic = "push_report_event"
+ bootstrap.servers = "kafka-broker-1:9092,kafka-broker-2:9092"
+ consumer.group = "seatunnel_event_backfill"
+ kafka.config = {
+ max.poll.records = 100
+ auto.offset.reset = "earliest"
+ enable.auto.commit = "false"
+ }
+ }
+}
+
+transform {
+ Metadata {
+ plugin_input = "kafka_raw"
+ plugin_output = "kafka_with_meta"
+ metadata_fields = {
+ EventTime = "kafka_ts"
+ }
+ }
+
+ Sql {
+ plugin_input = "kafka_with_meta"
+ plugin_output = "source_table"
+ query = "select id, customer_type, data, FROM_UNIXTIME(kafka_ts/1000,
'yyyy-MM-dd', 'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+ }
+}
+
+sink {
+ Hive {
+ table_name = "example_db.ods_sys_event_report"
+ metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+ hdfs_site_path = "/path/to/hdfs-site.xml"
+ hive_site_path = "/path/to/hive-site.xml"
+ krb5_path = "/path/to/krb5.conf"
+ kerberos_principal = "hive/[email protected]"
+ kerberos_keytab_path = "/path/to/hive.keytab"
+ overwrite = false
+ plugin_input = "source_table"
+ # compress_codec = "SNAPPY"
+ }
+}
+```
+
+Here `pt` is derived from the Kafka event time and can be used as a Hive
partition column.
diff --git a/docs/zh/connector-v2/source/Kafka.md
b/docs/zh/connector-v2/source/Kafka.md
index 643b584cc3..0f16ef3cf4 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -69,12 +69,42 @@ import ChangeLog from '../changelog/connector-kafka.md';
debezium_record_table_filter {
database_name = "test"
schema_name = "public" // null 如果不存在
- table_name = "products"
+ table_name = "products"
}
```
只有 `test.public.products` 表的数据将被消费。
+## 元数据支持
+
+Kafka 源会在 `ConsumerRecord.timestamp` 大于等于 0 时,将其自动写入 SeaTunnel 行的 `EventTime`
元数据。可以借助 [Metadata 转换](../../transform-v2/metadata.md) 把这段时间戳暴露为普通字段,方便做分区或下游
SQL 处理。
+
+```hocon
+source {
+ Kafka {
+ plugin_output = "kafka_raw"
+ topic = "seatunnel_topic"
+ bootstrap.servers = "localhost:9092"
+ format = json
+ }
+}
+
+transform {
+ Metadata {
+ plugin_input = "kafka_raw"
+ plugin_output = "kafka_with_meta"
+ metadata_fields {
+ EventTime = kafka_ts # ConsumerRecord.timestamp (ms)
+ }
+ }
+ Sql {
+ plugin_input = "kafka_with_meta"
+ plugin_output = "kafka_enriched"
+ query = "select *, FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd',
'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+ }
+}
+```
+
## 任务示例
### 简单示例
@@ -423,4 +453,4 @@ source {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index 4a714901a7..1567fd20ee 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -24,14 +24,15 @@ Metadata 转换插件用于将数据行中的元数据信息提取并转换为
| Database | string | 数据所属的数据库名称 | 所有连接器 |
| Table | string | 数据所属的表名称 | 所有连接器 |
| RowKind | string | 行的变更类型,值为:+I(插入)、-U(更新前)、+U(更新后)、-D(删除) | 所有连接器 |
-| EventTime | long | 数据变更的事件时间戳(毫秒) | CDC 连接器 |
+| EventTime | long | 数据变更的事件时间戳(毫秒) | CDC 连接器;Kafka
源(ConsumerRecord.timestamp) |
| Delay | long | 数据采集延迟时间(毫秒),即数据抽取时间与数据库变更时间的差值 | CDC 连接器 |
| Partition | string | 数据所属的分区信息,多个分区字段使用逗号分隔 | 支持分区的连接器 |
### 重要说明
-1. **元数据字段区分大小写**:配置时必须严格按照上表中的 Key 名称(如 `Database`、`Table`、`RowKind` 等)
-2. **CDC 专有字段**:`EventTime` 和 `Delay` 仅在使用 CDC 连接器时有效(TiDB-CDC 除外)
+1. **元数据字段区分大小写**:配置时必须严格按照上表中的 Key 名称(如 `Database`、`Table`、`RowKind` 等)。
+2. **时间相关字段**:`Delay` 仅在 CDC 连接器有效(TiDB-CDC 除外);`EventTime` 由 CDC 连接器写入,也会在
Kafka 源中使用 `ConsumerRecord.timestamp`(毫秒,非负时)写入。
+3. **Kafka 事件时间**:Kafka 源会在 `ConsumerRecord.timestamp` 非负时写入 `EventTime`,可通过
Metadata 转换将其暴露为普通字段。
## 配置选项
@@ -171,3 +172,71 @@ sink {
}
}
```
+
+### 示例 3:Kafka 写入时间用于分区
+
+将 Kafka `ConsumerRecord.timestamp`(写入到 `EventTime` 元数据)暴露为普通字段,再生成分区字段并写入
Hive,适合回放或补数场景。
+
+```hocon
+env {
+ execution.parallelism = 4
+ job.mode = "STREAMING"
+ checkpoint.interval = 60000
+}
+
+source {
+ Kafka {
+ plugin_output = "kafka_raw"
+ schema = {
+ fields {
+ id = bigint
+ customer_type = string
+ data = string
+ }
+ }
+ format = text
+ field_delimiter = "|"
+ topic = "push_report_event"
+ bootstrap.servers = "kafka-broker-1:9092,kafka-broker-2:9092"
+ consumer.group = "seatunnel_event_backfill"
+ kafka.config = {
+ max.poll.records = 100
+ auto.offset.reset = "earliest"
+ enable.auto.commit = "false"
+ }
+ }
+}
+
+transform {
+ Metadata {
+ plugin_input = "kafka_raw"
+ plugin_output = "kafka_with_meta"
+ metadata_fields = {
+ EventTime = "kafka_ts"
+ }
+ }
+
+ Sql {
+ plugin_input = "kafka_with_meta"
+ plugin_output = "source_table"
+ query = "select id, customer_type, data, FROM_UNIXTIME(kafka_ts/1000,
'yyyy-MM-dd', 'Asia/Shanghai') as pt from kafka_with_meta where kafka_ts >= 0"
+ }
+}
+
+sink {
+ Hive {
+ table_name = "example_db.ods_sys_event_report"
+ metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+ hdfs_site_path = "/path/to/hdfs-site.xml"
+ hive_site_path = "/path/to/hive-site.xml"
+ krb5_path = "/path/to/krb5.conf"
+ kerberos_principal = "hive/[email protected]"
+ kerberos_keytab_path = "/path/to/hive.keytab"
+ overwrite = false
+ plugin_input = "source_table"
+ # compress_codec = "SNAPPY"
+ }
+}
+```
+
+上面的 `pt` 字段由 Kafka 事件时间转换而来,可在 Hive 中作为分区列使用,便于补数和校准分区。
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaEventTimeDeserializationSchema.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaEventTimeDeserializationSchema.java
new file mode 100644
index 0000000000..9916f02748
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaEventTimeDeserializationSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.IOException;
+
+/**
+ * A {@link DeserializationSchema} wrapper that attaches Kafka record
timestamp as {@code
+ * CommonOptions.EVENT_TIME} metadata to emitted {@link SeaTunnelRow}s.
+ *
+ * <p>The timestamp for the current record is provided via {@link
#setCurrentRecordTimestamp(Long)}
+ * before deserialization is invoked.
+ */
+public class KafkaEventTimeDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
+
+ private final DeserializationSchema<SeaTunnelRow> delegate;
+
+ private Long currentRecordTimestamp;
+
+ public
KafkaEventTimeDeserializationSchema(DeserializationSchema<SeaTunnelRow>
delegate) {
+ this.delegate = delegate;
+ }
+
+ public DeserializationSchema<SeaTunnelRow> getDelegate() {
+ return delegate;
+ }
+
+ public void setCurrentRecordTimestamp(Long timestamp) {
+ this.currentRecordTimestamp = timestamp;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ SeaTunnelRow row = delegate.deserialize(message);
+ if (row == null) {
+ return null;
+ }
+ attachEventTime(row);
+ return row;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out)
throws IOException {
+ delegate.deserialize(
+ message,
+ new Collector<SeaTunnelRow>() {
+ @Override
+ public void collect(SeaTunnelRow record) {
+ attachEventTime(record);
+ out.collect(record);
+ }
+
+ @Override
+ public void markSchemaChangeBeforeCheckpoint() {
+ out.markSchemaChangeBeforeCheckpoint();
+ }
+
+ @Override
+ public void collect(
+
org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent event) {
+ out.collect(event);
+ }
+
+ @Override
+ public void markSchemaChangeAfterCheckpoint() {
+ out.markSchemaChangeAfterCheckpoint();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return out.getCheckpointLock();
+ }
+
+ @Override
+ public boolean isEmptyThisPollNext() {
+ return out.isEmptyThisPollNext();
+ }
+
+ @Override
+ public void resetEmptyThisPollNext() {
+ out.resetEmptyThisPollNext();
+ }
+ });
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return delegate.getProducedType();
+ }
+
+ private void attachEventTime(SeaTunnelRow row) {
+ if (row == null || currentRecordTimestamp == null ||
currentRecordTimestamp < 0) {
+ return;
+ }
+ Object existing =
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+ if (existing == null) {
+ MetadataUtil.setEventTime(row, currentRecordTimestamp);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
index 1e3811ba49..66810cf33c 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
@@ -61,6 +61,10 @@ public class KafkaRecordEmitter
// todo there is an additional loss in this place for non-multi-table
scenarios
DeserializationSchema<SeaTunnelRow> deserializationSchema =
mapMetadata.get(splitState.getTablePath()).getDeserializationSchema();
+ if (deserializationSchema instanceof
KafkaEventTimeDeserializationSchema) {
+ ((KafkaEventTimeDeserializationSchema) deserializationSchema)
+ .setCurrentRecordTimestamp(consumerRecord.timestamp());
+ }
try {
if (deserializationSchema instanceof
CompatibleKafkaConnectDeserializationSchema) {
((CompatibleKafkaConnectDeserializationSchema)
deserializationSchema)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 97e5451ce5..ce2c38cbb0 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -25,6 +25,8 @@ import
org.apache.seatunnel.api.options.table.TableIdentifierOptions;
import org.apache.seatunnel.api.options.table.TableSchemaOptions;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -266,20 +268,38 @@ public class KafkaSourceConfig implements Serializable {
}
TablePath tablePath = getTablePathFromSchema(readonlyConfig,
readonlyConfig.get(TOPIC));
- return CatalogTable.of(
- TableIdentifier.of("", tablePath),
- tableSchema,
- new HashMap<String, String>() {
- {
-
Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME))
- .ifPresent(value ->
put(PROTOBUF_MESSAGE_NAME.key(), value));
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("", tablePath),
+ tableSchema,
+ new HashMap<String, String>() {
+ {
+
Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME))
+ .ifPresent(
+ value ->
put(PROTOBUF_MESSAGE_NAME.key(), value));
-
Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA))
- .ifPresent(value -> put(PROTOBUF_SCHEMA.key(),
value));
- }
- },
- Collections.emptyList(),
- null);
+
Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA))
+ .ifPresent(value ->
put(PROTOBUF_SCHEMA.key(), value));
+ }
+ },
+ Collections.emptyList(),
+ null);
+
+ // Expose Kafka record timestamp as metadata 'EventTime' for Metadata
transform
+ MetadataSchema metadataSchema =
+ MetadataSchema.builder()
+ .column(
+ MetadataColumn.of(
+
org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME
+ .getName(),
+ BasicType.LONG_TYPE,
+ 0L,
+ true,
+ null,
+ null))
+ .build();
+
+ return CatalogTable.withMetadata(catalogTable, metadataSchema);
}
private TablePath getTablePathFromSchema(ReadonlyConfig readonlyConfig,
String topicName) {
@@ -299,85 +319,110 @@ public class KafkaSourceConfig implements Serializable {
SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
MessageFormat format = readonlyConfig.get(FORMAT);
+ DeserializationSchema<SeaTunnelRow> schema;
+
if (format == MessageFormat.NATIVE) {
- return new NativeKafkaConnectDeserializationSchema(
- catalogTable, false, false, false, false);
+ schema =
+ new NativeKafkaConnectDeserializationSchema(
+ catalogTable, false, false, false, false);
+ } else if
(!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
+ schema =
+ TextDeserializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(TextFormatConstant.PLACEHOLDER)
+ .setCatalogTable(catalogTable)
+ .build();
+ } else {
+ switch (format) {
+ case JSON:
+ schema = new JsonDeserializationSchema(catalogTable,
false, false);
+ break;
+ case TEXT:
+ String delimiter = readonlyConfig.get(FIELD_DELIMITER);
+ schema =
+ TextDeserializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(delimiter)
+ .build();
+ break;
+ case CANAL_JSON:
+ schema =
+
CanalJsonDeserializationSchema.builder(catalogTable)
+ .setIgnoreParseErrors(true)
+ .build();
+ break;
+ case OGG_JSON:
+ schema =
+ OggJsonDeserializationSchema.builder(catalogTable)
+ .setIgnoreParseErrors(true)
+ .build();
+ break;
+ case MAXWELL_JSON:
+ schema =
+
MaxWellJsonDeserializationSchema.builder(catalogTable)
+ .setIgnoreParseErrors(true)
+ .build();
+ break;
+ case COMPATIBLE_KAFKA_CONNECT_JSON:
+ Boolean keySchemaEnable =
+ readonlyConfig.get(
+
KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
+ Boolean valueSchemaEnable =
+ readonlyConfig.get(
+
KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
+ schema =
+ new CompatibleKafkaConnectDeserializationSchema(
+ catalogTable, keySchemaEnable,
valueSchemaEnable, false, false);
+ break;
+ case DEBEZIUM_JSON:
+ boolean includeSchema =
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
+ TableIdentifierConfig tableFilter =
+ readonlyConfig.get(DEBEZIUM_RECORD_TABLE_FILTER);
+ if (tableFilter != null) {
+ TablePath tablePath =
+ TablePath.of(
+
StringUtils.isNotEmpty(tableFilter.getDatabaseName())
+ ? tableFilter.getDatabaseName()
+ : null,
+
StringUtils.isNotEmpty(tableFilter.getSchemaName())
+ ? tableFilter.getSchemaName()
+ : null,
+
StringUtils.isNotEmpty(tableFilter.getTableName())
+ ? tableFilter.getTableName()
+ : null);
+ Map<TablePath, DebeziumJsonDeserializationSchema>
tableDeserializationMap =
+ Collections.singletonMap(
+ tablePath,
+ new DebeziumJsonDeserializationSchema(
+ catalogTable, true,
includeSchema));
+ schema =
+ new
DebeziumJsonDeserializationSchemaDispatcher(
+ tableDeserializationMap, true,
includeSchema);
+ } else {
+ schema =
+ new DebeziumJsonDeserializationSchema(
+ catalogTable, true, includeSchema);
+ }
+ break;
+ case AVRO:
+ schema = new AvroDeserializationSchema(catalogTable);
+ break;
+ case PROTOBUF:
+ schema = new ProtobufDeserializationSchema(catalogTable);
+ break;
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unsupported format: " + format);
+ }
}
- if
(!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
- return TextDeserializationSchema.builder()
- .seaTunnelRowType(seaTunnelRowType)
- .delimiter(TextFormatConstant.PLACEHOLDER)
- .setCatalogTable(catalogTable)
- .build();
+ if (schema instanceof NativeKafkaConnectDeserializationSchema
+ || schema instanceof
CompatibleKafkaConnectDeserializationSchema) {
+ return schema;
}
- switch (format) {
- case JSON:
- return new JsonDeserializationSchema(catalogTable, false,
false);
- case TEXT:
- String delimiter = readonlyConfig.get(FIELD_DELIMITER);
- return TextDeserializationSchema.builder()
- .seaTunnelRowType(seaTunnelRowType)
- .delimiter(delimiter)
- .build();
- case CANAL_JSON:
- return CanalJsonDeserializationSchema.builder(catalogTable)
- .setIgnoreParseErrors(true)
- .build();
- case OGG_JSON:
- return OggJsonDeserializationSchema.builder(catalogTable)
- .setIgnoreParseErrors(true)
- .build();
- case MAXWELL_JSON:
- return MaxWellJsonDeserializationSchema.builder(catalogTable)
- .setIgnoreParseErrors(true)
- .build();
-
- case COMPATIBLE_KAFKA_CONNECT_JSON:
- Boolean keySchemaEnable =
- readonlyConfig.get(
-
KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
- Boolean valueSchemaEnable =
- readonlyConfig.get(
-
KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
- return new CompatibleKafkaConnectDeserializationSchema(
- catalogTable, keySchemaEnable, valueSchemaEnable,
false, false);
- case DEBEZIUM_JSON:
- boolean includeSchema =
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
- TableIdentifierConfig tableFilter =
- readonlyConfig.get(DEBEZIUM_RECORD_TABLE_FILTER);
- if (tableFilter != null) {
- TablePath tablePath =
- TablePath.of(
-
StringUtils.isNotEmpty(tableFilter.getDatabaseName())
- ? tableFilter.getDatabaseName()
- : null,
-
StringUtils.isNotEmpty(tableFilter.getSchemaName())
- ? tableFilter.getSchemaName()
- : null,
-
StringUtils.isNotEmpty(tableFilter.getTableName())
- ? tableFilter.getTableName()
- : null);
- Map<TablePath, DebeziumJsonDeserializationSchema>
tableDeserializationMap =
- Collections.singletonMap(
- tablePath,
- new DebeziumJsonDeserializationSchema(
- catalogTable, true,
includeSchema));
- return new DebeziumJsonDeserializationSchemaDispatcher(
- tableDeserializationMap, true, includeSchema);
- } else {
- return new DebeziumJsonDeserializationSchema(catalogTable,
true, includeSchema);
- }
- case AVRO:
- return new AvroDeserializationSchema(catalogTable);
- case PROTOBUF:
- return new ProtobufDeserializationSchema(catalogTable);
- default:
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unsupported format: " + format);
- }
+ return new KafkaEventTimeDeserializationSchema(schema);
}
private TableSchema nativeTableSchema() {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java
new file mode 100644
index 0000000000..14fd630304
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class KafkaRecordEmitterTest {
+
+ @Test
+ void emitRecordShouldAttachKafkaTimestampAsEventTime() throws Exception {
+ long kafkaTimestamp = 1690000000000L;
+
+ // Prepare a simple deserialization schema that creates a single-field
row from bytes
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"f0"}, new SeaTunnelDataType[]
{BasicType.STRING_TYPE});
+ DeserializationSchema<SeaTunnelRow> schema =
+ new KafkaEventTimeDeserializationSchema(new
SimpleStringRowSchema(rowType));
+
+ // Build ConsumerMetadata map for the table
+ ConsumerMetadata metadata = new ConsumerMetadata();
+ metadata.setDeserializationSchema(schema);
+ Map<TablePath, ConsumerMetadata> map = new HashMap<>();
+ TablePath tablePath = TablePath.DEFAULT;
+ map.put(tablePath, metadata);
+
+ KafkaRecordEmitter emitter = new KafkaRecordEmitter(map,
MessageFormatErrorHandleWay.FAIL);
+
+ // Mock ConsumerRecord<byte[], byte[]>
+ org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]>
record =
+
Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class);
+ Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp);
+
Mockito.when(record.value()).thenReturn("hello".getBytes(StandardCharsets.UTF_8));
+ Mockito.when(record.offset()).thenReturn(100L);
+
+ // Prepare split state
+ KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new
TopicPartition("t", 0));
+ KafkaSourceSplitState splitState = new KafkaSourceSplitState(split);
+
+ // Capture outputs
+ List<SeaTunnelRow> out = new ArrayList<>();
+ Collector<SeaTunnelRow> collector = new TestCollector(out);
+
+ emitter.emitRecord(record, collector, splitState);
+
+ Assertions.assertEquals(1, out.size());
+ SeaTunnelRow row = out.get(0);
+ Object eventTime =
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+ Assertions.assertEquals(kafkaTimestamp, eventTime);
+
+ // Also verify split state offset advanced
+ Assertions.assertEquals(101L, splitState.getCurrentOffset());
+ }
+
+ @Test
+ void emitRecordShouldNotAttachEventTimeWhenTimestampNegative() throws
Exception {
+ long kafkaTimestamp = -1L; // invalid timestamp
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"f0"}, new SeaTunnelDataType[]
{BasicType.STRING_TYPE});
+ DeserializationSchema<SeaTunnelRow> schema =
+ new KafkaEventTimeDeserializationSchema(new
SimpleStringRowSchema(rowType));
+
+ ConsumerMetadata metadata = new ConsumerMetadata();
+ metadata.setDeserializationSchema(schema);
+ Map<TablePath, ConsumerMetadata> map = new HashMap<>();
+ TablePath tablePath = TablePath.DEFAULT;
+ map.put(tablePath, metadata);
+
+ KafkaRecordEmitter emitter = new KafkaRecordEmitter(map,
MessageFormatErrorHandleWay.FAIL);
+
+ org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]>
record =
+
Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class);
+ Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp);
+
Mockito.when(record.value()).thenReturn("world".getBytes(StandardCharsets.UTF_8));
+ Mockito.when(record.offset()).thenReturn(5L);
+
+ KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new
TopicPartition("t2", 1));
+ KafkaSourceSplitState splitState = new KafkaSourceSplitState(split);
+
+ List<SeaTunnelRow> out = new ArrayList<>();
+ Collector<SeaTunnelRow> collector = new TestCollector(out);
+
+ emitter.emitRecord(record, collector, splitState);
+
+ Assertions.assertEquals(1, out.size());
+ SeaTunnelRow row = out.get(0);
+
Assertions.assertFalse(row.getOptions().containsKey(CommonOptions.EVENT_TIME.getName()));
+ Assertions.assertEquals(6L, splitState.getCurrentOffset());
+ }
+
+ private static class SimpleStringRowSchema implements
DeserializationSchema<SeaTunnelRow> {
+ private final SeaTunnelRowType producedType;
+
+ private SimpleStringRowSchema(SeaTunnelRowType producedType) {
+ this.producedType = producedType;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ String v = new String(message, StandardCharsets.UTF_8);
+ return new SeaTunnelRow(new Object[] {v});
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return producedType;
+ }
+ }
+
+ private static class TestCollector implements Collector<SeaTunnelRow> {
+ private final List<SeaTunnelRow> out;
+
+ private TestCollector(List<SeaTunnelRow> out) {
+ this.out = out;
+ }
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ out.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return this;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
index adde5bc861..e550c99590 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
@@ -65,10 +65,15 @@ public class KafkaSourceConfigTest {
DeserializationSchema<SeaTunnelRow> deserializationSchema =
sourceConfig.getMapMetadata().get(TablePath.of("test")).getDeserializationSchema();
- Assertions.assertTrue(
- deserializationSchema instanceof
DebeziumJsonDeserializationSchemaDispatcher);
+
+ Assertions.assertTrue(deserializationSchema instanceof
KafkaEventTimeDeserializationSchema);
+
+ DeserializationSchema<SeaTunnelRow> innerSchema =
+ ((KafkaEventTimeDeserializationSchema)
deserializationSchema).getDelegate();
+
+ Assertions.assertTrue(innerSchema instanceof
DebeziumJsonDeserializationSchemaDispatcher);
Assertions.assertNotNull(
- ((DebeziumJsonDeserializationSchemaDispatcher)
deserializationSchema)
+ ((DebeziumJsonDeserializationSchemaDispatcher) innerSchema)
.getTableDeserializationMap()
.get(TablePath.of("test.test.test")));
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 124ed7ff17..38c9184139 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -469,6 +469,36 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(1, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason =
+ "The implementation of the Spark engine does not currently
support metadata.")
+ public void testSourceKafkaTextEventTimeToAssert(TestContainer container)
+ throws IOException, InterruptedException {
+ long fixedTimestamp = 1738395840000L;
+ TextSerializationSchema serializer =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+ .delimiter(",")
+ .build();
+ generateTestData(
+ row ->
+ new ProducerRecord<>(
+ "test_topic_text_eventtime",
+ null,
+ fixedTimestamp,
+ null,
+ serializer.serialize(row)),
+ 0,
+ 10);
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/textFormatIT/kafka_source_text_with_event_time_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
@TestTemplate
public void testSourceKafka(TestContainer container) throws IOException,
InterruptedException {
testKafkaLatestToConsole(container);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf
new file mode 100644
index 0000000000..6ccd055c1f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_text_eventtime"
+ plugin_output = "kafka_table"
+ start_mode = "earliest"
+ format_error_handle_way = fail
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ format = text
+ field_delimiter = ","
+ }
+}
+
+transform {
+ Metadata {
+ plugin_input = "kafka_table"
+ plugin_output = "kafka_table_with_meta"
+ metadata_fields = {
+ EventTime = event_time_ms
+ }
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "kafka_table_with_meta"
+ rules = {
+ field_rules = [
+ {
+ field_name = event_time_ms
+ field_type = bigint
+ field_value = [
+ { rule_type = NOT_NULL }
+ ]
+ }
+ ]
+ }
+ }
+}
+
diff --git
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
index 1d0974668b..0a9e8867ea 100644
---
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
@@ -21,6 +21,8 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -122,6 +124,7 @@ public class CompatibleKafkaConnectDeserializationSchema
for (int i = 0; i < arrayNode.size(); i++) {
SeaTunnelRow row = convertJsonNode(arrayNode.get(i));
row.setRowKind(rowKind);
+ attachEventTime(row, msg.timestamp());
if (tablePath.isPresent()) {
row.setTableId(tablePath.toString());
}
@@ -130,6 +133,7 @@ public class CompatibleKafkaConnectDeserializationSchema
} else {
SeaTunnelRow row = convertJsonNode(payload);
row.setRowKind(rowKind);
+ attachEventTime(row, msg.timestamp());
if (tablePath.isPresent()) {
row.setTableId(tablePath.toString());
}
@@ -176,6 +180,16 @@ public class CompatibleKafkaConnectDeserializationSchema
return seaTunnelRowType;
}
+ private void attachEventTime(SeaTunnelRow row, long timestamp) {
+ if (row == null || timestamp < 0) {
+ return;
+ }
+ Object existing =
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+ if (existing == null) {
+ MetadataUtil.setEventTime(row, timestamp);
+ }
+ }
+
private void tryInitConverter() {
if (keyConverter == null) {
synchronized (this) {
diff --git
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
index d4aca3ff4b..e6fa90100e 100644
---
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
@@ -21,6 +21,8 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -112,6 +114,7 @@ public class NativeKafkaConnectDeserializationSchema
SeaTunnelRow row = convertJsonNode(record);
row.setRowKind(rowKind);
+ attachEventTime(row, msg.timestamp());
if (tablePath.isPresent()) {
row.setTableId(tablePath.toString());
}
@@ -199,4 +202,14 @@ public class NativeKafkaConnectDeserializationSchema
}
}
}
+
+ private void attachEventTime(SeaTunnelRow row, long timestamp) {
+ if (row == null || timestamp < 0) {
+ return;
+ }
+ Object existing =
row.getOptions().get(CommonOptions.EVENT_TIME.getName());
+ if (existing == null) {
+ MetadataUtil.setEventTime(row, timestamp);
+ }
+ }
}