> if you strip the magic byte, and the schema has > evolved when you're consuming it from Flink, > you can end up with deserialization errors given > that a field might have been deleted/added/ > changed etc.
Aren’t we already fairly dependent on the schema remaining consistent, because otherwise we’d need to update the table schema as well? > it wouldn't work when you actually want to > write avro-confluent, because that requires a > check when producing if you're still being compliant. I’m not sure what you mean here, sorry. Are you thinking about issues if you needed to mix-and-match with both formatters at the same time? (Rather than just using the Avro formatter as I was describing) Kind regards Dale From: Martijn Visser <martijnvis...@apache.org> Date: Friday, 27 October 2023 at 14:03 To: dev@flink.apache.org <dev@flink.apache.org> Subject: [EXTERNAL] Re: [DISCUSS] Confluent Avro support without Schema Registry access Hi Dale, I'm struggling to understand in what cases you want to read data serialized in connection with Confluent Schema Registry, but can't get access to the Schema Registry service. It seems like a rather exotic situation and it beats the purposes of using a Schema Registry in the first place? I also doubt that it's actually really useful: if you strip the magic byte, and the schema has evolved when you're consuming it from Flink, you can end up with deserialization errors given that a field might have been deleted/added/changed etc. Also, it wouldn't work when you actually want to write avro-confluent, because that requires a check when producing if you're still being compliant. Best regards, Martijn On Fri, Oct 27, 2023 at 2:53 PM Dale Lane <dale.l...@uk.ibm.com> wrote: > > TLDR: > We currently require a connection to a Confluent Schema Registry to be able > to work with Confluent Avro data. With a small modification to the Avro > formatter, I think we could also offer the ability to process this type of > data without requiring access to the schema registry. > > What would people think of such an enhancement? > > ----- > > When working with Avro data, there are two formats available to us: avro and > avro-confluent. > > avro > Data it supports: Avro records > Approach: You specify a table schema and it derives an appropriate Avro > schema from this. > > avro-confluent > Data it supports: Confluent’s variant[1] of the Avro encoding > Approach: You provide connection details (URL, credentials, > keystore/truststore, schema lookup strategy, etc.) for retrieving an > appropriate schema from the Confluent Schema Registry. > > What this means is if you have Confluent Avro data[2] that you want to use in > Flink, you currently have to use the avro-confluent format, and that means > you need to provide Flink with access to your Schema Registry. > > I think there will be times where you may not want, or may not be able, to > provide Flink with direct access to a Schema Registry. In such cases, it > would be useful to support the same behaviour that the avro format does (i.e. > allow you to explicitly specify a table schema) > > This could be achieved with a very minor modification to the avro formatter. > > For reading records, we could add an option to the formatter to highlight > when records will be Confluent Avro. If that option is set, we just need the > formatter to skip the first bytes with the schema ID/version (it can then use > the remaining bytes with a regular Avro decoder as it does today – the > existing implementation would be essentially unchanged). > > For writing records, something similar would work. An option to the formatter > to highlight when to write records using Confluent Avro. We would need a way > to specify what ID value to use for the first bytes [3]. (After that, the > record can be encoded with a regular Avro encoder as it does today – the rest > of the implementation would be unchanged). > > > ----- > [1] – This is the same as regular Avro, but prefixing the payload with extra > bytes that identify which schema to use, to allow an appropriate schema to be > retrieved from a schema registry. > > [2] – Records that were serialized by > io.confluent.kafka.serializers.KafkaAvroSerializer and could be read by > io.confluent.kafka.serializers.KafkaAvroDeserializer. > > [3] – Either by making them fixed options for that formatter, or by allowing > it to be specified from something in the record. > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU