[ 
https://issues.apache.org/jira/browse/FLINK-24494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437208#comment-17437208
 ] 

Biao Geng edited comment on FLINK-24494 at 11/2/21, 9:10 AM:
-------------------------------------------------------------

Hi [~mahen], I have met this problem as well. 
 I have implemented a simple catalog that will connect to Schema Registry(SR) 
service to get the schema string of the topic in Kafka whose format is 
Confluent Avro. Also, I tried to utilize the 
'AvroSchemaConverter#convertToDataType()' method to parse the Confluent Avro 
schema string from SR and so I can directly get the CatalogTable from my 
catalog.  It works well in flink sql job like 'SELECT * FROM xx_topic' . 
However, when I tried to insert some new records into the topic, by executing 
sql statements like 'INSERT INTO xx_topic SELECT * FROM xx_topic LIMIT 2', it 
is strange to see that in SR API interface, there is a new version of schema 
generated whose name is 'record' and the namespace is erased. I believe that's 
the same with [~MartijnVisser]'s description.

I wonder if there is any reason for hardcoding the record name in schema as 
'record' and if we have any plan to fix this.

Thank you very much.


was (Author: bgeng777):
Hi [~mahen], I have met this problem as well. 
I have implemented a simple catalog that will connect to Schema Registry(SR) 
service to get the schema string of the topic in Kafka whose format is 
Confluent Avro. Also, I tried to utilize the 
'AvroSchemaConverter#convertToDataType()' method to parse the Confluent Avro 
schema string from SR and it works well in flink sql job like 'SELECT * FROM 
xx_topic' . However, when I tried to insert some new records into the topic, by 
executing sql statements like 'INSERT INTO xx_topic SELECT * FROM xx_topic 
LIMIT 2', it is strange to see that in SR API interface, I found there is a new 
version of schema whose name is 'record' and the namespace is erased. That's 
the same with [~MartijnVisser]'s description.

I wonder if there is any reason for hardcoding the record name in schema as 
'record' and if we have any plan to fix this.

 

Thank you very much.

> Avro Confluent Registry SQL kafka connector  fails to write to the topic with 
> schema 
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-24494
>                 URL: https://issues.apache.org/jira/browse/FLINK-24494
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / API
>    Affects Versions: 1.14.0, 1.13.2
>            Reporter: Mahendran Ponnusamy
>            Priority: Critical
>         Attachments: Screen Shot 2021-10-12 at 10.38.55 AM.png, 
> image-2021-10-12-12-09-30-374.png, image-2021-10-12-12-11-04-664.png, 
> image-2021-10-12-12-18-53-016.png, image-2021-10-12-12-19-37-227.png, 
> image-2021-10-12-12-21-02-008.png, image-2021-10-12-12-21-46-177.png
>
>
>  *Summary:*
> Given a schema registered to a topic with name and namespace
> when the flink sql with upsert-kafka connector writes to the topic,
> it fails coz row it tries to produce is not compatible with the schema 
> registered
>  
> *Root cause:*
> The upsert-kafka connector auto generates a schema with the +*name as 
> `record` and no namespace*+.  The below schema is generated by the connector. 
> I'm expecting the connector should pull the schema from the subject and use 
> ConfluentAvroRowSerialization to[which is not there today i believe] 
> serialize using the schema from the subject.
> Schema generated by the upsert-kafka connector which is using 
> AvroRowSerializer interanally
> !image-2021-10-12-12-21-46-177.png|width=813,height=23!
> {color:#cc7832}Schema Registered to the subject: {color}
> {
>  {color:#9876aa}"type" {color}{color:#cc7832}: 
> {color}{color:#6a8759}"record"{color}{color:#cc7832},{color} 
> {color:#9876aa}"name" {color}{color:#cc7832}: 
> {color}{color:#6a8759}"SimpleCustomer"{color}{color:#cc7832},{color} 
> {color:#9876aa}"namespace" {color}{color:#cc7832}: 
> {color}{color:#6a8759}"com...example.model"{color}{color:#cc7832},{color} 
> {color:#9876aa}"fields" {color}{color:#cc7832}: {color}[ {
>  {color:#9876aa}"name" {color}{color:#cc7832}: 
> {color}{color:#6a8759}"customerId"{color}{color:#cc7832},{color} 
> {color:#9876aa}"type" {color}{color:#cc7832}: 
> {color}{color:#6a8759}"string"{color} }{color:#cc7832}, {color}{
>  {color:#9876aa}"name" {color}{color:#cc7832}: 
> {color}{color:#6a8759}"age"{color}{color:#cc7832},{color} 
> {color:#9876aa}"type" {color}{color:#cc7832}: 
> {color}{color:#6a8759}"int"{color}{color:#cc7832},{color} 
> {color:#9876aa}"default" {color}{color:#cc7832}: 
> {color}{color:#6897bb}0{color} }]
>  }
>  
> Table SQL with upsert-kafka connector
> !image-2021-10-12-12-18-53-016.png|width=351,height=176!
>  
> The name "record" hardcoded
> !image-2021-10-12-12-21-02-008.png|width=464,height=138!  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to