[ https://issues.apache.org/jira/browse/FLINK-24494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437208#comment-17437208 ]
Biao Geng edited comment on FLINK-24494 at 11/2/21, 9:56 AM: ------------------------------------------------------------- Hi [~mahen], I have met this problem as well. I have implemented a simple catalog that will connect to Schema Registry(SR) service to get the schema string of the topic in Kafka whose format is Confluent Avro. Also, I tried to utilize the 'AvroSchemaConverter#convertToDataType()' method to parse the Confluent Avro schema string from SR and so I can directly get the CatalogTable from my catalog. It works well in flink sql job like 'SELECT * FROM xx_topic' . However, when I tried to insert some new records into the topic, by executing sql statements like 'INSERT INTO xx_topic SELECT * FROM xx_topic LIMIT 2', thought the sql job works well, it is strange to see that in SR API interface, there is a new version of schema generated whose name is 'record' and namespace is erased. Same with [~MartijnVisser]'s description. I wonder if there is any reason for hardcoding the record name in schema as 'record'. Thank you very much. was (Author: bgeng777): Hi [~mahen], I have met this problem as well. I have implemented a simple catalog that will connect to Schema Registry(SR) service to get the schema string of the topic in Kafka whose format is Confluent Avro. Also, I tried to utilize the 'AvroSchemaConverter#convertToDataType()' method to parse the Confluent Avro schema string from SR and so I can directly get the CatalogTable from my catalog. It works well in flink sql job like 'SELECT * FROM xx_topic' . However, when I tried to insert some new records into the topic, by executing sql statements like 'INSERT INTO xx_topic SELECT * FROM xx_topic LIMIT 2', it is strange to see that in SR API interface, there is a new version of schema generated whose name is 'record' and the namespace is erased. I believe that's the same with [~MartijnVisser]'s description. I wonder if there is any reason for hardcoding the record name in schema as 'record' and if we have any plan to fix this. Thank you very much. > Avro Confluent Registry SQL kafka connector fails to write to the topic with > schema > ------------------------------------------------------------------------------------- > > Key: FLINK-24494 > URL: https://issues.apache.org/jira/browse/FLINK-24494 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / API > Affects Versions: 1.14.0, 1.13.2 > Reporter: Mahendran Ponnusamy > Priority: Critical > Attachments: Screen Shot 2021-10-12 at 10.38.55 AM.png, > image-2021-10-12-12-09-30-374.png, image-2021-10-12-12-11-04-664.png, > image-2021-10-12-12-18-53-016.png, image-2021-10-12-12-19-37-227.png, > image-2021-10-12-12-21-02-008.png, image-2021-10-12-12-21-46-177.png > > > *Summary:* > Given a schema registered to a topic with name and namespace > when the flink sql with upsert-kafka connector writes to the topic, > it fails coz row it tries to produce is not compatible with the schema > registered > > *Root cause:* > The upsert-kafka connector auto generates a schema with the +*name as > `record` and no namespace*+. The below schema is generated by the connector. > I'm expecting the connector should pull the schema from the subject and use > ConfluentAvroRowSerialization to[which is not there today i believe] > serialize using the schema from the subject. > Schema generated by the upsert-kafka connector which is using > AvroRowSerializer interanally > !image-2021-10-12-12-21-46-177.png|width=813,height=23! > {color:#cc7832}Schema Registered to the subject: {color} > { > {color:#9876aa}"type" {color}{color:#cc7832}: > {color}{color:#6a8759}"record"{color}{color:#cc7832},{color} > {color:#9876aa}"name" {color}{color:#cc7832}: > {color}{color:#6a8759}"SimpleCustomer"{color}{color:#cc7832},{color} > {color:#9876aa}"namespace" {color}{color:#cc7832}: > {color}{color:#6a8759}"com...example.model"{color}{color:#cc7832},{color} > {color:#9876aa}"fields" {color}{color:#cc7832}: {color}[ { > {color:#9876aa}"name" {color}{color:#cc7832}: > {color}{color:#6a8759}"customerId"{color}{color:#cc7832},{color} > {color:#9876aa}"type" {color}{color:#cc7832}: > {color}{color:#6a8759}"string"{color} }{color:#cc7832}, {color}{ > {color:#9876aa}"name" {color}{color:#cc7832}: > {color}{color:#6a8759}"age"{color}{color:#cc7832},{color} > {color:#9876aa}"type" {color}{color:#cc7832}: > {color}{color:#6a8759}"int"{color}{color:#cc7832},{color} > {color:#9876aa}"default" {color}{color:#cc7832}: > {color}{color:#6897bb}0{color} }] > } > > Table SQL with upsert-kafka connector > !image-2021-10-12-12-18-53-016.png|width=351,height=176! > > The name "record" hardcoded > !image-2021-10-12-12-21-02-008.png|width=464,height=138! -- This message was sent by Atlassian Jira (v8.3.4#803005)