[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159773#comment-17159773
 ] 

Dawid Wysakowicz commented on FLINK-16048:
------------------------------------------

[~ykt836] IMO, we need both. The *schema-string* is the schema that the program 
expects. I think of it as the schema of the Table that was frozen at compile 
time. It is either the schema of the {{SpecificRecord}} we work with 
(irrelevant from the point of view of RowData) or the latest schema retrieved 
from the schema-registry when compiling the query. It is needed to e.g. provide 
a default value for  a field that was introduced in a later version of schema. 
Imagine a case:

Schema 1: has fields "field1", "field2"
Schema 2: added "field3" with default value "ABC" <- we executed a query at 
this point in time and we use the "field3"

If we do not pass the schema 2 as the "schema-string", then when we encounter a 
record written with Schema 1, we don't know how to provide the "field3" and our 
query fails.

> Support read/write confluent schema registry avro data  from Kafka
> ------------------------------------------------------------------
>
>                 Key: FLINK-16048
>                 URL: https://issues.apache.org/jira/browse/FLINK-16048
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>    Affects Versions: 1.11.0
>            Reporter: Leonard Xu
>            Assignee: Danny Chen
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to