Hi,
I have posted a Google Doc [0] to the mailing list for a discussion thread for 
a Flip proposal to introduce a Apicurio-avro format. The discussions have been 
resolved, please could a committer/PMC member copy the contents from the Google 
Doc, and create a FLIP number for this,. as per the process [1],
      Kind regards, David.
[0]
https://docs.google.com/document/d/14LWZPVFQ7F9mryJPdKXb4l32n7B0iWYkcOdEd1xTC7w/edit?usp=sharing

[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-CreateyourOwnFLIP

From: Jeyhun Karimov <je.kari...@gmail.com>
Date: Friday, 22 March 2024 at 13:05
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
Hi David,

Thanks a lot for clarification.
Sounds good to me.

Regards,
Jeyhun

On Fri, Mar 22, 2024 at 10:54 AM David Radley <david_rad...@uk.ibm.com>
wrote:

> Hi Jeyhun,
> Thanks for your feedback.
>
> So for outbound messages, the message includes the global ID. We register
> the schema and match on the artifact id. So if the schema then evolved,
> adding a new  version, the global ID would still be unique and the same
> version would be targeted. If you wanted to change the Flink table
> definition in line with a higher version, then you could do this – the
> artifact id would need to match for it to use the same schema and a higher
> artifact version would need to be provided. I notice that Apicurio has
> rules around compatibility that you can configure, I suppose if we attempt
> to create an artifact that breaks these rules , then the register schema
> will fail and the associated operation should fail (e.g. an insert). I have
> not tried this.
>
>
> For inbound messages, using the global id in the header – this targets one
> version of the schema. I can create different messages on the topic built
> with different schema versions, and I can create different tables in Flink,
> as long as the reader and writer schemas are compatible as per the
> https://github.com/apache/flink/blob/779459168c46b7b4c600ef52f99a5435f81b9048/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java#L109
> Then this should work.
>
> Does this address your question?
>     Kind regards, David.
>
>
> From: Jeyhun Karimov <je.kari...@gmail.com>
> Date: Thursday, 21 March 2024 at 21:06
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hi David,
>
> Thanks for the FLIP. +1 for it.
> I have a minor comment.
>
> Can you please elaborate more on mechanisms in place to ensure data
> consistency and integrity, particularly in the event of schema conflicts?
> Since each message includes a schema ID for inbound and outbound messages,
> can you elaborate more on message consistency in the context of schema
> evolution?
>
> Regards,
> Jeyhun
>
>
>
>
>
> On Wed, Mar 20, 2024 at 4:34 PM David Radley <david...@apache.org> wrote:
>
> > Thank you very much for your feedback Mark. I have made the changes in
> the
> > latest google document. On reflection I agree with you that the
> > globalIdPlacement format configuration should apply to the
> deserialization
> > as well, so it is declarative. I am also going to have a new
> configuration
> > option to work with content IDs as well as global IDs. In line with the
> > deser Apicurio IdHandler and headerHandlers.
> >
> >  kind regards, David.
> >
> >
> > On 2024/03/20 15:18:37 Mark Nuttall wrote:
> > > +1 to this
> > >
> > > A few small comments:
> > >
> > > Currently, if users have Avro schemas in an Apicurio Registry (an open
> > source Apache 2 licensed schema registry), then the natural way to work
> > with those Avro flows is to use the schemas in the Apicurio Repository.
> > > 'those Avro flows' ... this is the first reference to flows.
> > >
> > > The new format will use the global Id to look up the Avro schema that
> > the message was written during deserialization.
> > > I get the point, phrasing is awkward. Probably you're more interested
> in
> > content than word polish at this point though.
> > >
> > > The Avro Schema Registry (apicurio-avro) format
> > > The Confluent format is called avro-confluent; this should be
> > avro-apicurio
> > >
> > > How to create tables with Apicurio-avro format
> > > s/Apicurio-avro/avro-apicurio/g
> > >
> > > HEADER – globalId is put in the header
> > > LEGACY– global Id is put in the message as a long
> > > CONFLUENT - globalId is put in the message as an int.
> > > Please could we specify 'four-byte int' and 'eight-byte long' ?
> > >
> > > For a Kafka source the globalId will be looked for in this order:
> > > -     In the header
> > > -     After a magic byte as an int
> > > -     After a magic byte as a long.
> > > but apicurio-avro.globalid-placement has a default value of HEADER :
> why
> > do we have a search order as well? Isn't apicurio-avro.globalid-placement
> > enough? Don't the two mechanisms conflict?
> > >
> > > In addition to the types listed there, Flink supports reading/writing
> > nullable types. Flink maps nullable types to Avro union(something, null),
> > where something is the Avro type converted from Flink type.
> > > Is that definitely the right way round? I know we've had multiple
> > conversations about how unions work with Flink
> > >
> > >  This is because the writer schema is expanded, but this could not
> > complete if there are circularities.
> > > I understand your meaning but the sentence is awkward.
> > >
> > > The registered schema will be created or if it exists be updated.
> > > same again
> > >
> > > At some stage the lowest Flink level supported by the Kafka connector
> > will contain the additionalProperties methods in code flink.
> > > wording
> > >
> > > There existing Kafka deserialization for the writer schema passes down
> > the message body to be deserialised.
> > > wording
> > >
> > > @Override
> > > public void deserialize(ConsumerRecord<byte[], byte[]> message,
> > Collector<T> out)
> > >       throws IOException {
> > >       Map<String, Object> additionalPropertiesMap =  new HashMap<>();
> > >       for (Header header : message.additionalProperties()) {
> > >       headersMap.put(header.key(), header.value());
> > >       }
> > >       deserializationSchema.deserialize(message.value(), headersMap,
> > out);
> > > }
> > > This fails to compile at headersMap.
> > >
> > > The input stream and additionalProperties will be sent so the Apicurio
> > SchemaCoder which will try getting the globalId from the headers, then 4
> > bytes from the payload then 8 bytes from the payload.
> > > I'm still stuck on apicurio-avro.globalid-placement having a default
> > value of HEADER . Should we try all three, or fail if this config param
> has
> > a wrong value?
> > >
> > > Other considerations
> > > The implementation does not use the Apicurio deser libraries,
> > > Please can we refer to them as SerDes; this is the term used within the
> > documentation that you link to
> > >
> > >
> > > On 2024/03/20 10:09:08 David Radley wrote:
> > > > Hi,
> > > > As per the FLIP process I would like to raise a FLIP, but do not have
> > authority, so have created a google doc for the Flip to introduce a new
> > Apicurio Avro format. The document is
> >
> https://docs.google.com/document/d/14LWZPVFQ7F9mryJPdKXb4l32n7B0iWYkcOdEd1xTC7w/edit?usp=sharing
> > > >
> > > > I have prototyped a lot of the content to prove that this approach is
> > feasible. I look forward to the discussion,
> > > >       Kind regards, David.
> > > >
> > > >
> > > >
> > > > Unless otherwise stated above:
> > > >
> > > > IBM United Kingdom Limited
> > > > Registered in England and Wales with number 741598
> > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
> > > >
> > >
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to