FuYouJ commented on issue #7084:
URL: https://github.com/apache/seatunnel/issues/7084#issuecomment-2216668053
> > en: I spent about two days setting up Kafka to reproduce your issue
because I wanted to understand how Kafka messages are passed to the
transformer.en: 我花了大约两天的时间设置 Kafka 来重现您的问题,因为我想了解 Kafka 消息如何传递到变压器。
> > First, you need to configure Kafka with JSON format serialization and
specify the column name as address. The Kafka reader will then extract the
address data from the JSON. Consequently, downstream, the data will be:
cn:我花费了大概两天时间搭建kafka来复现你这个问题。因为我很想弄明白kafka的消息传到转换器是什么样子的。
首先你得kafka配置的json格式序列化,并且指定了address列名,那么kafkreader就会提取json数据里面的address数据,所以在下游的时候,数据是:
> > ```json
> > {
> > "streetAddress": "naist street",
> > "city": "Nara",
> > "postalCode": "630-0192"
> > }
> > ```
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > <img alt="image" width="54.140625" height="21"
src="https://private-user-images.githubusercontent.com/51348093/345082395-41122434-492e-4fe4-a77f-34a74fa0d01c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjA0MjcwOTYsIm5iZiI6MTcyMDQyNjc5NiwicGF0aCI6Ii81MTM0ODA5My8zNDUwODIzOTUtNDExMjI0MzQtNDkyZS00ZmU0LWE3N2YtMzRhNzRmYTBkMDFjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzA4VDA4MTk1NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTk5NTExNDRmZGI3YTdlNDc2Y2EyODUzZmI5YjA0ZmRhNTdmYzI0ZTAxY2M2MDUxNjRkYTk4OGY1OTUxMzA1MmUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.pkuZn5hnHcDxSQ7azGBn3vGAmta_Bh0lUr6NhA0_Xoc">
> > en:Knowing this information, the JSONPath configuration needs to be
modified accordingly. My modifications are as follows: cn:
知道了这个信息后,那么jsonpath的配置就要做一定的修改。我的修改如下:
> > <img alt="image" width="54.140625" height="21"
src="https://private-user-images.githubusercontent.com/51348093/345082395-41122434-492e-4fe4-a77f-34a74fa0d01c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjA0MjcwOTYsIm5iZiI6MTcyMDQyNjc5NiwicGF0aCI6Ii81MTM0ODA5My8zNDUwODIzOTUtNDExMjI0MzQtNDkyZS00ZmU0LWE3N2YtMzRhNzRmYTBkMDFjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzA4VDA4MTk1NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTk5NTExNDRmZGI3YTdlNDc2Y2EyODUzZmI5YjA0ZmRhNTdmYzI0ZTAxY2M2MDUxNjRkYTk4OGY1OTUxMzA1MmUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.pkuZn5hnHcDxSQ7azGBn3vGAmta_Bh0lUr6NhA0_Xoc">
en:知道此信息后,需要相应地修改 JSONPath 配置。我的修改如下: cn: 知道了这个信息后,
那么jsonpath的配置就要做一定的修改。我的修改如下:```
> > env {
> > parallelism = 2
> > job.mode = "BATCH"
> > }
> >
> > source {
> > Kafka {
> > schema = {
> > fields {
> > address = "string"
> > }
> > }
> > format = json
> > topic = "test2"
> > bootstrap.servers = "localhost:9092"
> > kafka.config = {
> > max.poll.records = 500
> > auto.offset.reset = "earliest"
> > }
> > }
> > }
> >
> > transform {
> > JsonPath {
> > columns = [
> > {
> > src_field = "address"
> > path = "$.streetAddress"
> > dest_field = "streetAddress"
> > },
> > {
> > src_field = "address"
> > path = "$.city"
> > dest_field = "city"
> > },
> > {
> > src_field = "address"
> > path = "$.postalCode"
> > dest_field = "postalCode"
> > }
> > ]
> > }
> > }
> >
> > sink {
> > Console {}
> > }
> > ```
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > en:After configuring it this way, the program runs correctly, as shown
in the picture. en:这样配置后,程序就能正确运行了。如图所示。
> > <img alt="image" width="54.140625" height="21"
src="https://private-user-images.githubusercontent.com/51348093/345083205-e5b94ec8-3952-49f9-b450-ee823f0a5be4.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjA0MjcwOTYsIm5iZiI6MTcyMDQyNjc5NiwicGF0aCI6Ii81MTM0ODA5My8zNDUwODMyMDUtZTViOTRlYzgtMzk1Mi00OWY5LWI0NTAtZWU4MjNmMGE1YmU0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzA4VDA4MTk1NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWNhMjljMmMyOWI3NzI4MDQ0Y2Y1MDkzNThhM2IxMWQ5OTQ2Njg4ZTUxYzc3ZTZmYTBjYzk2YmE1NjRiNTA1OGYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.Wzn9vzyM-WZoR1R_9YLun5wkcdWFGH3pSrUKN9WFjjk">
>
> 取值这个问题这样写能拿到,但是在能拿到字段信息了之后他有另外一个问题,这个同步到jdbc时生成的sql会将source_field也写入sql中
必须自己再加一层sql转换,具体截图可以看我另外一个回复
>
> The value problem can be obtained by writing it this way, but after
obtaining the field information, it has another problem. When synchronizing to
jdbc, the generated SQL will also write the source_field into the SQL, and you
must add another layer of SQL conversion yourself. For specific screenshots,
please refer to my other
reply这样写可以得到value的问题,但是得到字段信息后,又出现了一个问题。同步到jdbc时,生成的SQL也会将source_field写入SQL中,必须自己再添加一层SQL转换。具体截图可以参考我的另一条回复
I deliberately designed it this way, parsing field A will create new columns
for the new content.
当初是故意这么设计的,参考了其他的转换器,会保留原始列,解析的内容会创建新的列。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]