That sounds reasonable, thanks Matt.

As for the implementation, please note that there is another ongoing PR
that may touch the same classes that you are working on:
https://github.com/apache/kafka/pull/4148

So it may help if you can also take a look at that PR and see if it is
compatible with your changes.



Guozhang


On Tue, Oct 31, 2017 at 10:59 AM, Matt Farmer <m...@frmr.me> wrote:

> I've opened this pull request to implement the KIP as currently written:
> https://github.com/apache/kafka/pull/4165. It still needs some tests
> added,
> but largely represents the shape I was going for.
>
> If there are more points that folks would like to discuss, please let me
> know. If I don't hear anything by tomorrow afternoon I'll probably start a
> [VOTE] thread.
>
> Thanks,
> Matt
>
> On Fri, Oct 27, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> wrote:
>
> > I can’t think of a reason that would be problematic.
> >
> > Most of the time I would write a handler like this, I either want to
> > ignore the error or fail and bring everything down so that I can spin it
> > back up later and resume from earlier offsets. When we start up after
> > crashing we’ll eventually try to process the message we failed to produce
> > again.
> >
> > I’m concerned that “putting in a queue for later” opens you up to putting
> > messages into the destination topic in an unexpected order. However if
> > others feel differently, I’m happy to talk about it.
> >
> > On Fri, Oct 27, 2017 at 7:10 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> > Please correct me if I'm wrong, but my understanding is that the
> record
> >> > metadata is always null if an exception occurred while trying to
> >> produce.
> >>
> >> That is right. Thanks.
> >>
> >> I looked at the example code, and one thing I realized that since we are
> >> not passing the context in the handle function, we may not be implement
> >> the
> >> logic to send the fail records into another queue for future processing.
> >> Would people think that would be a big issue?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Thu, Oct 26, 2017 at 12:14 PM, Matt Farmer <m...@frmr.me> wrote:
> >>
> >> > Hello all,
> >> >
> >> > I've updated the KIP based on this conversation, and made it so that
> its
> >> > interface, config setting, and parameters line up more closely with
> the
> >> > interface in KIP-161 (deserialization handler).
> >> >
> >> > I believe there are a few specific questions I need to reply to.....
> >> >
> >> > > The question I had about then handle parameters are around the
> record,
> >> > > should it be `ProducerRecord<byte[], byte[]>`, or be generics of
> >> > > `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<?
> >> extends
> >> > > Object, ? extends Object>`?
> >> >
> >> > At this point in the code we're guaranteed that this is a
> >> > ProducerRecord<byte[], byte[]>, so the generics would just make it
> >> harder
> >> > to work with the key and value.
> >> >
> >> > > Also, should the handle function include the `RecordMetadata` as
> well
> >> in
> >> > > case it is not null?
> >> >
> >> > Please correct me if I'm wrong, but my understanding is that the
> record
> >> > metadata is always null if an exception occurred while trying to
> >> produce.
> >> >
> >> > > We may probably try to write down at least the following handling
> >> logic
> >> > and
> >> > > see if the given API is sufficient for it
> >> >
> >> > I've added some examples to the KIP. Let me know what you think.
> >> >
> >> > Cheers,
> >> > Matt
> >> >
> >> > On Mon, Oct 23, 2017 at 9:00 PM Matt Farmer <m...@frmr.me> wrote:
> >> >
> >> > > Thanks for this feedback. I’m at a conference right now and am
> >> planning
> >> > on
> >> > > updating the KIP again with details from this conversation later
> this
> >> > week.
> >> > >
> >> > > I’ll shoot you a more detailed response then! :)
> >> > > On Mon, Oct 23, 2017 at 8:16 PM Guozhang Wang <wangg...@gmail.com>
> >> > wrote:
> >> > >
> >> > >> Thanks for the KIP Matt.
> >> > >>
> >> > >> Regarding the handle interface of ProductionExceptionHandlerResp
> onse,
> >> > >> could
> >> > >> you write it on the wiki also, along with the actual added config
> >> names
> >> > >> (e.g. what
> >> > >>
> >> > >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
> >> > deserialization+exception+handlers
> >> > >> described).
> >> > >>
> >> > >> The question I had about then handle parameters are around the
> >> record,
> >> > >> should it be `ProducerRecord<byte[], byte[]>`, or be generics of
> >> > >> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<?
> >> extends
> >> > >> Object, ? extends Object>`?
> >> > >>
> >> > >> Also, should the handle function include the `RecordMetadata` as
> >> well in
> >> > >> case it is not null?
> >> > >>
> >> > >> We may probably try to write down at least the following handling
> >> logic
> >> > >> and
> >> > >> see if the given API is sufficient for it: 1) throw exception
> >> > immediately
> >> > >> to fail fast and stop the world, 2) log the error and drop record
> and
> >> > >> proceed silently, 3) send such errors to a specific "error" Kafka
> >> topic,
> >> > >> or
> >> > >> record it as an app-level metrics (
> >> > >> https://kafka.apache.org/documentation/#kafka_streams_monitoring)
> >> for
> >> > >> monitoring.
> >> > >>
> >> > >> Guozhang
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Fri, Oct 20, 2017 at 5:47 PM, Matt Farmer <m...@frmr.me> wrote:
> >> > >>
> >> > >> > I did some more digging tonight.
> >> > >> >
> >> > >> > @Ted: It looks like the deserialization handler uses
> >> > >> > "default.deserialization.exception.handler" for the config
> name. No
> >> > >> > ".class" on the end. I'm inclined to think this should use
> >> > >> > "default.production.exception.handler".
> >> > >> >
> >> > >> > On Fri, Oct 20, 2017 at 8:22 PM Matt Farmer <m...@frmr.me>
> wrote:
> >> > >> >
> >> > >> > > Okay, I've dug into this a little bit.
> >> > >> > >
> >> > >> > > I think getting access to the serialized record is possible,
> and
> >> > >> changing
> >> > >> > > the naming and return type is certainly doable. However,
> because
> >> > we're
> >> > >> > > hooking into the onCompletion callback we have no guarantee
> that
> >> the
> >> > >> > > ProcessorContext state hasn't changed by the time this
> particular
> >> > >> handler
> >> > >> > > runs. So I think the signature would change to something like:
> >> > >> > >
> >> > >> > > ProductionExceptionHandlerResponse handle(final
> >> ProducerRecord<..>
> >> > >> > record,
> >> > >> > > final Exception exception)
> >> > >> > >
> >> > >> > > Would this be acceptable?
> >> > >> > >
> >> > >> > > On Thu, Oct 19, 2017 at 7:33 PM Matt Farmer <m...@frmr.me>
> >> wrote:
> >> > >> > >
> >> > >> > >> Ah good idea. Hmmm. I can line up the naming and return type
> but
> >> > I’m
> >> > >> not
> >> > >> > >> sure if I can get my hands on the context and the record
> itself
> >> > >> without
> >> > >> > >> other changes.
> >> > >> > >>
> >> > >> > >> Let me dig in and follow up here tomorrow.
> >> > >> > >> On Thu, Oct 19, 2017 at 7:14 PM Matthias J. Sax <
> >> > >> matth...@confluent.io>
> >> > >> > >> wrote:
> >> > >> > >>
> >> > >> > >>> Thanks for the KIP.
> >> > >> > >>>
> >> > >> > >>> Are you familiar with KIP-161?
> >> > >> > >>>
> >> > >> > >>>
> >> > >> > >>>
> >> > >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
> >> > >> > deserialization+exception+handlers
> >> > >> > >>>
> >> > >> > >>> I thinks, we should align the design (parameter naming,
> return
> >> > >> types,
> >> > >> > >>> class names etc) of KIP-210 to KIP-161 to get a unified user
> >> > >> > experience.
> >> > >> > >>>
> >> > >> > >>>
> >> > >> > >>>
> >> > >> > >>> -Matthias
> >> > >> > >>>
> >> > >> > >>>
> >> > >> > >>> On 10/18/17 4:20 PM, Matt Farmer wrote:
> >> > >> > >>> > I’ll create the JIRA ticket.
> >> > >> > >>> >
> >> > >> > >>> > I think that config name will work. I’ll update the KIP
> >> > >> accordingly.
> >> > >> > >>> > On Wed, Oct 18, 2017 at 6:09 PM Ted Yu <
> yuzhih...@gmail.com>
> >> > >> wrote:
> >> > >> > >>> >
> >> > >> > >>> >> Can you create JIRA that corresponds to the KIP ?
> >> > >> > >>> >>
> >> > >> > >>> >> For the new config, how about naming it
> >> > >> > >>> >> production.exception.processor.class
> >> > >> > >>> >> ? This way it is clear that class name should be
> specified.
> >> > >> > >>> >>
> >> > >> > >>> >> Cheers
> >> > >> > >>> >>
> >> > >> > >>> >> On Wed, Oct 18, 2017 at 2:40 PM, Matt Farmer <
> m...@frmr.me>
> >> > >> wrote:
> >> > >> > >>> >>
> >> > >> > >>> >>> Hello everyone,
> >> > >> > >>> >>>
> >> > >> > >>> >>> This is the discussion thread for the KIP that I just
> filed
> >> > >> here:
> >> > >> > >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > >> > >>> >>> 210+-+Provide+for+custom+error+handling++when+Kafka+
> >> > >> > >>> >>> Streams+fails+to+produce
> >> > >> > >>> >>>
> >> > >> > >>> >>> Looking forward to getting some feedback from folks about
> >> this
> >> > >> idea
> >> > >> > >>> and
> >> > >> > >>> >>> working toward a solution we can contribute back. :)
> >> > >> > >>> >>>
> >> > >> > >>> >>> Cheers,
> >> > >> > >>> >>> Matt Farmer
> >> > >> > >>> >>>
> >> > >> > >>> >>
> >> > >> > >>> >
> >> > >> > >>>
> >> > >> > >>>
> >> > >> >
> >> > >>
> >> > >>
> >> > >>
> >> > >> --
> >> > >> -- Guozhang
> >> > >>
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>



-- 
-- Guozhang

Reply via email to