[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159791#comment-17159791
 ] 

Dawid Wysakowicz commented on FLINK-16048:
------------------------------------------

It is (either DDL or catalog). But you need to pass that schema from DDL to the 
DeserializationSchema.

Just to be clear, because we might be talking about slightly different things 
here. 
1) This should not be user provided option. It should not be specified by the 
user.
2) The DeserializationSchema needs that property. However the FormatFactory 
should probably get it from the {{CatalogTable#getSchema}}. If you meant that 
than you are right and I misunderstood you.

> Support read/write confluent schema registry avro data  from Kafka
> ------------------------------------------------------------------
>
>                 Key: FLINK-16048
>                 URL: https://issues.apache.org/jira/browse/FLINK-16048
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>    Affects Versions: 1.11.0
>            Reporter: Leonard Xu
>            Assignee: Danny Chen
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.12.0
>
>
> *The background*
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]
> *The format details*
> _The factory identifier (or format id)_
> There are 2 candidates now ~
> - {{avro-sr}}: the pattern borrowed from KSQL {{JSON_SR}} format [1]
> - {{avro-confluent}}: the pattern borrowed from Clickhouse {{AvroConfluent}} 
> [2]
> Personally i would prefer {{avro-sr}} because it is more concise and the 
> confluent is a company name which i think is not that suitable for a format 
> name.
> _The format attributes_
> || Options || required || Remark ||
> | schema-string | true | avro schema string used for (de)serialization |
> | schema-registry.url | true | URL to connect to schema registry service |
> | schema-registry.subject | false | Subject name to write to the Schema 
> Registry service, required for sink |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to