Adamyuanyuan opened a new pull request, #9994:
URL: https://github.com/apache/seatunnel/pull/9994
<!--
Thank you for contributing to SeaTunnel! Please make sure that your code
changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
## Contribution Checklist
- Make sure that the pull request corresponds to a [GITHUB
issue](https://github.com/apache/seatunnel/issues).
- Name the pull request in the form "[Feature] [component] Title of the
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
- Minor fixes should be named following this pattern: `[hotfix] [docs] Fix
typo in README.md doc`.
-->
### Purpose of this pull request
Motivation
- Problem: In Kafka->Hive streaming, using
CURRENT_DATE()/CURRENT_TIMESTAMP() misplaces records when replaying; parsing
create_date is brittle due to dirty/mixed formats.
- Goal: Reuse SeaTunnel’s metadata mechanism to inject Kafka
ConsumerRecord.timestamp as EventTime, then let users materialize it via the
Metadata transform for SQL/partitioning.
Design
- In KafkaRecordEmitter: capture ConsumerRecord.timestamp per record; in
OutputCollector.collect, if record is SeaTunnelRow and timestamp>=0, call
MetadataUtil.setEventTime(row, ts).
- No schema change, no mandatory new options; injection is on by default.
Users opt-in to materialize via the Metadata transform (e.g., mapping EventTime
to kafka_ts).
### Does this PR introduce _any_ user-facing change?
user can use
```
transform {
Metadata {
source_table_name = "result_table"
result_table_name = "result_with_meta"
metadata_fields = { EventTime = "kafka_ts" }
}
Sql {
source_table_name = "result_with_meta"
result_table_name = "source_table"
query = "select ..., FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd',
'Asia/Shanghai') as pt from result_with_meta where kafka_ts >= 0"
}
}
```
to partitioning and transforms
### How was this patch tested?
yes,UT and E2E
### Check list
* [ ] If any new Jar binary package adding in your PR, please add License
Notice according
[New License
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [ ] If necessary, please update the documentation to describe the new
feature. https://github.com/apache/seatunnel/tree/dev/docs
* [ ] If you are contributing the connector code, please check that the
following files are updated:
1. Update
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
and add new connector information in it
2. Update the pom file of
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
3. Add ci label in
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
4. Add e2e testcase in
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
5. Update connector
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]