swpuEzio commented on issue #7084:
URL: https://github.com/apache/seatunnel/issues/7084#issuecomment-2213348666
> en: I spent about two days setting up Kafka to reproduce your issue
because I wanted to understand how Kafka messages are passed to the transformer.
>
> First, you need to configure Kafka with JSON format serialization and
specify the column name as address. The Kafka reader will then extract the
address data from the JSON. Consequently, downstream, the data will be:
cn:我花费了大概两天时间搭建kafka来复现你这个问题。因为我很想弄明白kafka的消息传到转换器是什么样子的。
首先你得kafka配置的json格式序列化,并且指定了address列名,那么kafkreader就会提取json数据里面的address数据,所以在下游的时候,数据是:
>
> ```json
> {
> "streetAddress": "naist street",
> "city": "Nara",
> "postalCode": "630-0192"
> }
> ```
>
> <img alt="image" width="1280"
src="https://private-user-images.githubusercontent.com/51348093/345082395-41122434-492e-4fe4-a77f-34a74fa0d01c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjA0MjcwOTYsIm5iZiI6MTcyMDQyNjc5NiwicGF0aCI6Ii81MTM0ODA5My8zNDUwODIzOTUtNDExMjI0MzQtNDkyZS00ZmU0LWE3N2YtMzRhNzRmYTBkMDFjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzA4VDA4MTk1NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTk5NTExNDRmZGI3YTdlNDc2Y2EyODUzZmI5YjA0ZmRhNTdmYzI0ZTAxY2M2MDUxNjRkYTk4OGY1OTUxMzA1MmUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.pkuZn5hnHcDxSQ7azGBn3vGAmta_Bh0lUr6NhA0_Xoc">
> en:Knowing this information, the JSONPath configuration needs to be
modified accordingly. My modifications are as follows: cn:
知道了这个信息后,那么jsonpath的配置就要做一定的修改。我的修改如下:
>
> ```
> env {
> parallelism = 2
> job.mode = "BATCH"
> }
>
> source {
> Kafka {
> schema = {
> fields {
> address = "string"
> }
> }
> format = json
> topic = "test2"
> bootstrap.servers = "localhost:9092"
> kafka.config = {
> max.poll.records = 500
> auto.offset.reset = "earliest"
> }
> }
> }
>
> transform {
> JsonPath {
> columns = [
> {
> src_field = "address"
> path = "$.streetAddress"
> dest_field = "streetAddress"
> },
> {
> src_field = "address"
> path = "$.city"
> dest_field = "city"
> },
> {
> src_field = "address"
> path = "$.postalCode"
> dest_field = "postalCode"
> }
> ]
> }
> }
>
> sink {
> Console {}
> }
> ```
>
> en:After configuring it this way, the program runs correctly, as shown in
the picture. en:这样配置后,程序就能正确运行了。如图所示。
>
> <img alt="image" width="1460"
src="https://private-user-images.githubusercontent.com/51348093/345083205-e5b94ec8-3952-49f9-b450-ee823f0a5be4.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjA0MjcwOTYsIm5iZiI6MTcyMDQyNjc5NiwicGF0aCI6Ii81MTM0ODA5My8zNDUwODMyMDUtZTViOTRlYzgtMzk1Mi00OWY5LWI0NTAtZWU4MjNmMGE1YmU0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzA4VDA4MTk1NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWNhMjljMmMyOWI3NzI4MDQ0Y2Y1MDkzNThhM2IxMWQ5OTQ2Njg4ZTUxYzc3ZTZmYTBjYzk2YmE1NjRiNTA1OGYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.Wzn9vzyM-WZoR1R_9YLun5wkcdWFGH3pSrUKN9WFjjk">
取值这个问题这样写能拿到,但是在能拿到字段信息了之后他有另外一个问题,这个同步到jdbc时生成的sql会将source_field也写入sql中
必须自己再加一层sql转换,具体截图可以看我另外一个回复
The value problem can be obtained by writing it this way, but after
obtaining the field information, it has another problem. When synchronizing to
jdbc, the generated SQL will also write the source_field into the SQL, and you
must add another layer of SQL conversion yourself. For specific screenshots,
please refer to my other reply
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]