Hi Arvid,

The ConfluentRegistryAvroDeserializationSchema uses a checkAvroInitialized()
call for every single record to initialize the schema for the first time.
This is clearly an indication of a missing open/configure method. In
addition some of the Kafka serializers rely on properties that are usually
passed together with the Kafka configuration. Adding a configure method
that gets the kafka properties provides a familiar way of implementing it
without having to pass properties twice. (Once for the Producer/Consumer
and once for the schema).

The example you mentioned doesn't implement any closing logic. Imegine if
the schema registry would have created a background thread to fetch data
and would have to be closed. There is no way to do that now.
The confluent schema registry doesnt work this way, but other registries
might.

I hope this answers your question.

Gyula


On Thu, Sep 5, 2019 at 10:01 AM Arvid Heise <ar...@data-artisans.com> wrote:

> Hi Gyula,
>
> when looking at the ConfluentRegistryAvroDeserializationSchema [1], it
> seems like the intended way is to pass all configuration parameters in the
> constructor. So you could call open there.
>
> Could you please line out in more details why this is not enough? What
> would you do in open and close respectively?
>
> Best,
>
> Arvid
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
>
> On Thu, Sep 5, 2019 at 9:43 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > Hi all!
> >
> > While implementing a new custom flink serialization schema that wraps an
> > existing Kafka serializer, I realized we are missing 2 key methods that
> > could be easily added:
> >
> > void configure(java.util.Map<java.lang.String,?> configs);
> > void close();
> >
> > We could rename configure to open but Kafka serializers have a configure
> > method.
> > The configure method would be called when the operator start with the
> > provided kafka properties and the close when it shuts down.
> >
> > Currently there is no way to access the properties from the schema
> > interfaces or close the schema on failure.
> >
> > This would be a very simple addition and could be added as optional
> methods
> > to the interface to not break any schemas that are implemented as
> lambdas.
> >
> > What do you think?
> >
> > Gyula
> >
>
>
> --
>
> Arvid Heise | Senior Software Engineer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Reply via email to