I would be in favor of buffering data outside of the checkpoint lock. In my
experience, serialization is always the biggest performance killer in user
code and I have a hard time believing in practice that anyone is going to
buffer so many records that is causes real memory concerns.

To add to Timo's point,

Statefun actually did that on its Kinesis ser/de interfaces[1,2].

Seth

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java


On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org> wrote:

> Hi Dawid,
>
> thanks for this FLIP. This solves a lot of issues with the current
> design for both the Flink contributors and users. +1 for this.
>
> Some minor suggestions from my side:
> - How about finding something shorter for `InitializationContext`? Maybe
> just `OpenContext`?
> - While introducing default methods for existing interfaces, shall we
> also create contexts for those methods? I see the following method in
> your FLIP and wonder if we can reduce the number of parameters while
> introducing a new method:
>
> deserialize(
>              byte[] recordValue,
>              String partitionKey,
>              String seqNum,
>              long approxArrivalTimestamp,
>              String stream,
>              String shardId,
>              Collector<T> out)
>
> to:
>
> deserialize(
>              byte[] recordValue,
>              Context c,
>              Collector<T> out)
>
> What do you think?
>
> Regards,
> Timo
>
>
>
> On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > Hi devs,
> >
> > When working on improving the Table API/SQL connectors we faced a few
> > shortcomings of the DeserializationSchema and SerializationSchema
> > interfaces. Similar features were also mentioned by other users in the
> > past. The shortcomings I would like to address with the FLIP include:
> >
> >   * Emitting 0 to m records from the deserialization schema with per
> >     partition watermarks
> >       o https://github.com/apache/flink/pull/3314#issuecomment-376237266
> >       o differentiate null value from no value
> >       o support for Debezium CDC format
> >         (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> )
> >
> >   * A way to initialize the schema
> >       o establish external connections
> >       o generate code on startup
> >       o no need for lazy initialization
> >
> >   * Access to metrics
> >     [
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> ]
> >
> > One important aspect I would like to hear your opinion on is how to
> > support the Collector interface in Kafka source. Of course if we agree
> > to add the Collector to the DeserializationSchema.
> >
> > The FLIP can be found here:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> >
> > Looking forward to your feedback.
> >
> > Best,
> >
> > Dawid
> >
>
>

Reply via email to