That sounds reasonable, thanks Matt. As for the implementation, please note that there is another ongoing PR that may touch the same classes that you are working on: https://github.com/apache/kafka/pull/4148
So it may help if you can also take a look at that PR and see if it is compatible with your changes. Guozhang On Tue, Oct 31, 2017 at 10:59 AM, Matt Farmer <m...@frmr.me> wrote: > I've opened this pull request to implement the KIP as currently written: > https://github.com/apache/kafka/pull/4165. It still needs some tests > added, > but largely represents the shape I was going for. > > If there are more points that folks would like to discuss, please let me > know. If I don't hear anything by tomorrow afternoon I'll probably start a > [VOTE] thread. > > Thanks, > Matt > > On Fri, Oct 27, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> wrote: > > > I can’t think of a reason that would be problematic. > > > > Most of the time I would write a handler like this, I either want to > > ignore the error or fail and bring everything down so that I can spin it > > back up later and resume from earlier offsets. When we start up after > > crashing we’ll eventually try to process the message we failed to produce > > again. > > > > I’m concerned that “putting in a queue for later” opens you up to putting > > messages into the destination topic in an unexpected order. However if > > others feel differently, I’m happy to talk about it. > > > > On Fri, Oct 27, 2017 at 7:10 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> > Please correct me if I'm wrong, but my understanding is that the > record > >> > metadata is always null if an exception occurred while trying to > >> produce. > >> > >> That is right. Thanks. > >> > >> I looked at the example code, and one thing I realized that since we are > >> not passing the context in the handle function, we may not be implement > >> the > >> logic to send the fail records into another queue for future processing. > >> Would people think that would be a big issue? > >> > >> > >> Guozhang > >> > >> > >> On Thu, Oct 26, 2017 at 12:14 PM, Matt Farmer <m...@frmr.me> wrote: > >> > >> > Hello all, > >> > > >> > I've updated the KIP based on this conversation, and made it so that > its > >> > interface, config setting, and parameters line up more closely with > the > >> > interface in KIP-161 (deserialization handler). > >> > > >> > I believe there are a few specific questions I need to reply to..... > >> > > >> > > The question I had about then handle parameters are around the > record, > >> > > should it be `ProducerRecord<byte[], byte[]>`, or be generics of > >> > > `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? > >> extends > >> > > Object, ? extends Object>`? > >> > > >> > At this point in the code we're guaranteed that this is a > >> > ProducerRecord<byte[], byte[]>, so the generics would just make it > >> harder > >> > to work with the key and value. > >> > > >> > > Also, should the handle function include the `RecordMetadata` as > well > >> in > >> > > case it is not null? > >> > > >> > Please correct me if I'm wrong, but my understanding is that the > record > >> > metadata is always null if an exception occurred while trying to > >> produce. > >> > > >> > > We may probably try to write down at least the following handling > >> logic > >> > and > >> > > see if the given API is sufficient for it > >> > > >> > I've added some examples to the KIP. Let me know what you think. > >> > > >> > Cheers, > >> > Matt > >> > > >> > On Mon, Oct 23, 2017 at 9:00 PM Matt Farmer <m...@frmr.me> wrote: > >> > > >> > > Thanks for this feedback. I’m at a conference right now and am > >> planning > >> > on > >> > > updating the KIP again with details from this conversation later > this > >> > week. > >> > > > >> > > I’ll shoot you a more detailed response then! :) > >> > > On Mon, Oct 23, 2017 at 8:16 PM Guozhang Wang <wangg...@gmail.com> > >> > wrote: > >> > > > >> > >> Thanks for the KIP Matt. > >> > >> > >> > >> Regarding the handle interface of ProductionExceptionHandlerResp > onse, > >> > >> could > >> > >> you write it on the wiki also, along with the actual added config > >> names > >> > >> (e.g. what > >> > >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ > >> > deserialization+exception+handlers > >> > >> described). > >> > >> > >> > >> The question I had about then handle parameters are around the > >> record, > >> > >> should it be `ProducerRecord<byte[], byte[]>`, or be generics of > >> > >> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? > >> extends > >> > >> Object, ? extends Object>`? > >> > >> > >> > >> Also, should the handle function include the `RecordMetadata` as > >> well in > >> > >> case it is not null? > >> > >> > >> > >> We may probably try to write down at least the following handling > >> logic > >> > >> and > >> > >> see if the given API is sufficient for it: 1) throw exception > >> > immediately > >> > >> to fail fast and stop the world, 2) log the error and drop record > and > >> > >> proceed silently, 3) send such errors to a specific "error" Kafka > >> topic, > >> > >> or > >> > >> record it as an app-level metrics ( > >> > >> https://kafka.apache.org/documentation/#kafka_streams_monitoring) > >> for > >> > >> monitoring. > >> > >> > >> > >> Guozhang > >> > >> > >> > >> > >> > >> > >> > >> On Fri, Oct 20, 2017 at 5:47 PM, Matt Farmer <m...@frmr.me> wrote: > >> > >> > >> > >> > I did some more digging tonight. > >> > >> > > >> > >> > @Ted: It looks like the deserialization handler uses > >> > >> > "default.deserialization.exception.handler" for the config > name. No > >> > >> > ".class" on the end. I'm inclined to think this should use > >> > >> > "default.production.exception.handler". > >> > >> > > >> > >> > On Fri, Oct 20, 2017 at 8:22 PM Matt Farmer <m...@frmr.me> > wrote: > >> > >> > > >> > >> > > Okay, I've dug into this a little bit. > >> > >> > > > >> > >> > > I think getting access to the serialized record is possible, > and > >> > >> changing > >> > >> > > the naming and return type is certainly doable. However, > because > >> > we're > >> > >> > > hooking into the onCompletion callback we have no guarantee > that > >> the > >> > >> > > ProcessorContext state hasn't changed by the time this > particular > >> > >> handler > >> > >> > > runs. So I think the signature would change to something like: > >> > >> > > > >> > >> > > ProductionExceptionHandlerResponse handle(final > >> ProducerRecord<..> > >> > >> > record, > >> > >> > > final Exception exception) > >> > >> > > > >> > >> > > Would this be acceptable? > >> > >> > > > >> > >> > > On Thu, Oct 19, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> > >> wrote: > >> > >> > > > >> > >> > >> Ah good idea. Hmmm. I can line up the naming and return type > but > >> > I’m > >> > >> not > >> > >> > >> sure if I can get my hands on the context and the record > itself > >> > >> without > >> > >> > >> other changes. > >> > >> > >> > >> > >> > >> Let me dig in and follow up here tomorrow. > >> > >> > >> On Thu, Oct 19, 2017 at 7:14 PM Matthias J. Sax < > >> > >> matth...@confluent.io> > >> > >> > >> wrote: > >> > >> > >> > >> > >> > >>> Thanks for the KIP. > >> > >> > >>> > >> > >> > >>> Are you familiar with KIP-161? > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ > >> > >> > deserialization+exception+handlers > >> > >> > >>> > >> > >> > >>> I thinks, we should align the design (parameter naming, > return > >> > >> types, > >> > >> > >>> class names etc) of KIP-210 to KIP-161 to get a unified user > >> > >> > experience. > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> -Matthias > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> On 10/18/17 4:20 PM, Matt Farmer wrote: > >> > >> > >>> > I’ll create the JIRA ticket. > >> > >> > >>> > > >> > >> > >>> > I think that config name will work. I’ll update the KIP > >> > >> accordingly. > >> > >> > >>> > On Wed, Oct 18, 2017 at 6:09 PM Ted Yu < > yuzhih...@gmail.com> > >> > >> wrote: > >> > >> > >>> > > >> > >> > >>> >> Can you create JIRA that corresponds to the KIP ? > >> > >> > >>> >> > >> > >> > >>> >> For the new config, how about naming it > >> > >> > >>> >> production.exception.processor.class > >> > >> > >>> >> ? This way it is clear that class name should be > specified. > >> > >> > >>> >> > >> > >> > >>> >> Cheers > >> > >> > >>> >> > >> > >> > >>> >> On Wed, Oct 18, 2017 at 2:40 PM, Matt Farmer < > m...@frmr.me> > >> > >> wrote: > >> > >> > >>> >> > >> > >> > >>> >>> Hello everyone, > >> > >> > >>> >>> > >> > >> > >>> >>> This is the discussion thread for the KIP that I just > filed > >> > >> here: > >> > >> > >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > >> > >>> >>> 210+-+Provide+for+custom+error+handling++when+Kafka+ > >> > >> > >>> >>> Streams+fails+to+produce > >> > >> > >>> >>> > >> > >> > >>> >>> Looking forward to getting some feedback from folks about > >> this > >> > >> idea > >> > >> > >>> and > >> > >> > >>> >>> working toward a solution we can contribute back. :) > >> > >> > >>> >>> > >> > >> > >>> >>> Cheers, > >> > >> > >>> >>> Matt Farmer > >> > >> > >>> >>> > >> > >> > >>> >> > >> > >> > >>> > > >> > >> > >>> > >> > >> > >>> > >> > >> > > >> > >> > >> > >> > >> > >> > >> > >> -- > >> > >> -- Guozhang > >> > >> > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > -- -- Guozhang