Hi Damien, First off thanks for the KIP, this is definitely a much needed feature. On the whole it seems pretty straightforward and I am in favor of the proposal. Just a few questions and suggestions here and there:
1. One of the #handle method's parameters is "ProcessorNode node", but ProcessorNode is an internal class (and would expose a lot of internals that we probably don't want to pass in to an exception handler). Would it be sufficient to just make this a String and pass in the processor name? 2. Another of the parameters in the ProcessorContext. This would enable the handler to potentially forward records, which imo should not be done from the handler since it could only ever call #forward but not direct where the record is actually forwarded to, and could cause confusion if users aren't aware that the handler is effectively calling from the context of the processor that threw the exception. 2a. If you don't explicitly want the ability to forward records, I would suggest changing the type of this parameter to ProcessingContext, which has all the metadata and useful info of the ProcessorContext but without the forwarding APIs. This would also lets us sidestep the following issue: 2b. If you *do* want the ability to forward records, setting aside whether that in of itself makes sense to do, we would need to pass in either a regular ProcessorContext or a FixedKeyProcessorContext, depending on what kind of processor it is. I'm not quite sure how we could design a clean API here, so I'll hold off until you clarify whether you even want forwarding or not. We would also need to split the input record into a Record vs FixedKeyRecord 3. One notable difference between this handler and the existing ones you pointed out, the Deserialization/ProductionExceptionHandler, is that the records passed in to those are in serialized bytes, whereas the record here would be POJOs. You account for this by making the parameter type a Record<Object, Object>, but I just wonder how users would be able to read the key/value and figure out what type it should be. For example, would they need to maintain a map from processor name to input record types? If you could provide an example of this new feature in the KIP, it would be very helpful in understanding whether we could do something to make it easier for users to use, for if it would be fine as-is 4. We should include all the relevant info for a new metric, such as the metric group and recording level. You can look at other metrics KIPs like KIP-444 and KIP-613 for an example. I suspect you intend for this to be in the processor group and at the INFO level? Hope that all makes sense! Thanks again for the KIP -Sophie On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina <d.gaspar...@gmail.com> wrote: > Hi everyone, > > After writing quite a few Kafka Streams applications, me and my colleagues > just created KIP-1033 to introduce a new Exception Handler in Kafka Streams > to simplify error handling. > This feature would allow defining an exception handler to automatically > catch exceptions occurring during the processing of a message. > > KIP link: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > Feedbacks and suggestions are welcome, > > Cheers, > Damien, Sebastien and Loic >