Adamyuanyuan opened a new issue, #9990: URL: https://github.com/apache/seatunnel/issues/9990
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description Motivation - Problem: In Kafka->Hive streaming, using CURRENT_DATE()/CURRENT_TIMESTAMP() misplaces records when replaying; parsing create_date is brittle due to dirty/mixed formats. - Goal: Reuse SeaTunnel’s metadata mechanism to inject Kafka ConsumerRecord.timestamp as EventTime, then let users materialize it via the Metadata transform for SQL/partitioning. Design - In KafkaRecordEmitter: capture ConsumerRecord.timestamp per record; in OutputCollector.collect, if record is SeaTunnelRow and timestamp>=0, call MetadataUtil.setEventTime(row, ts). - No schema change, no mandatory new options; injection is on by default. Users opt-in to materialize via the Metadata transform (e.g., mapping EventTime to kafka_ts). - Semantics align with Flink CDC’s metadata.list: CDC exposes readable metadata; SeaTunnel follows the unified pattern “source injects row metadata + materialize via Metadata transform”. Config & Example - No required new option. Example: ``` transform { Metadata { source_table_name = "result_table" result_table_name = "result_with_meta" metadata_fields = { EventTime = "kafka_ts" } } Sql { source_table_name = "result_with_meta" result_table_name = "source_table" query = "select ..., FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd', 'Asia/Shanghai') as pt from result_with_meta where kafka_ts >= 0" } } ``` ### Usage Scenario user can use Metadata { source_table_name = "result_table" result_table_name = "result_with_meta" metadata_fields = { EventTime = "kafka_ts" } } to partitioning and transforms ### Related issues _No response_ ### Are you willing to submit a PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
