[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162464#comment-17162464
 ] 

Danny Chen commented on FLINK-16048:
------------------------------------

The schema string can be inferred from the DDL table schema, just like what our 
existing avro format does. One thing user need to be in caution is that for the 
sink table, the fields nullability should be strictly same with the avro schema 
but Flink and Avro have different default nullability strategy (Flink default 
nullable & Avro default not).

The Cloudera schema registry has different code path for schema registry, so i 
think there is no possibility it shares the same format id with confluent 
schema registry. But i'm still voting avro-sr if supporting Cloudera avro 
schema is not on our road map, the "confluent" is too verbose, Seth Wiesman has 
the same feeling.

> Support read/write confluent schema registry avro data  from Kafka
> ------------------------------------------------------------------
>
>                 Key: FLINK-16048
>                 URL: https://issues.apache.org/jira/browse/FLINK-16048
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>    Affects Versions: 1.11.0
>            Reporter: Leonard Xu
>            Assignee: Danny Chen
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to