[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160999#comment-17160999
 ] 

Danny Chen edited comment on FLINK-16048 at 7/20/20, 8:34 AM:
--------------------------------------------------------------

[~ykt836] Yes, we can infer the read schema from the DDL schema, the schema 
string is not necessary.

[~anshul.bansal] We are planning to implement a schema registry catalog with 
what you do not need to create a table explicitly.

[~dwysakowicz], this may be a problem, but do we plan to support cloudra schema 
registry ?


was (Author: danny0405):
[~ykt836] This PR contributes 2 formats of schema registry avro, one way to 
remove the schema string is to pass in a schema id or the subject name, do the 
inference when the format was instantiated.

We can fetch the avro schema string of the compile time but that was in the 
scope of schema inference, i think it is not good practice to let the format do 
a schema inference, the format se/de schema should be solid. The inference work 
should be done by the framework, either the DDL or the Catalog.

[~dwysakowicz] Personally i agree the schema-string  should not be user 
provided option, but we actually can not recover a schema string from a 
TableSchema, any good ideas ?

One way i can think of is let user specify the subject name PLUS the schema 
version, what do you think ?

> Support read/write confluent schema registry avro data  from Kafka
> ------------------------------------------------------------------
>
>                 Key: FLINK-16048
>                 URL: https://issues.apache.org/jira/browse/FLINK-16048
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>    Affects Versions: 1.11.0
>            Reporter: Leonard Xu
>            Assignee: Danny Chen
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to