Adamyuanyuan opened a new pull request, #9994:
URL: https://github.com/apache/seatunnel/pull/9994

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code 
changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
     - Make sure that the pull request corresponds to a [GITHUB 
issue](https://github.com/apache/seatunnel/issues).
     - Name the pull request in the form "[Feature] [component] Title of the 
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix 
typo in README.md doc`.
   -->
   
   ### Purpose of this pull request
   
   Motivation
   
   - Problem: In Kafka->Hive streaming, using 
CURRENT_DATE()/CURRENT_TIMESTAMP() misplaces records when replaying; parsing 
create_date is brittle due to dirty/mixed formats.
   - Goal: Reuse SeaTunnel’s metadata mechanism to inject Kafka 
ConsumerRecord.timestamp as EventTime, then let users materialize it via the 
Metadata transform for SQL/partitioning.
   Design
   
   - In KafkaRecordEmitter: capture ConsumerRecord.timestamp per record; in 
OutputCollector.collect, if record is SeaTunnelRow and timestamp>=0, call 
MetadataUtil.setEventTime(row, ts).
   - No schema change, no mandatory new options; injection is on by default. 
Users opt-in to materialize via the Metadata transform (e.g., mapping EventTime 
to kafka_ts).
   
   ### Does this PR introduce _any_ user-facing change?
   user can use
   ```
   transform {
     Metadata {
       source_table_name = "result_table"
       result_table_name = "result_with_meta"
       metadata_fields = { EventTime = "kafka_ts" }
     }
     Sql {
       source_table_name = "result_with_meta"
       result_table_name = "source_table"
       query = "select ..., FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd', 
'Asia/Shanghai') as pt from result_with_meta where kafka_ts >= 0"
     }
   }
   ```
   to partitioning and transforms
   
   ### How was this patch tested?
   
   yes,UT and E2E
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to