Hi Alieh, Just a few more comments on the KIP. It is looking much less risky now the scope is tighter.
[AJS1] It would be nice to have default implementations of the handle methods so an implementor would not need to implement both themselves. [AJS2] Producer configurations which are class names usually end in “.class”. I suggest “custom.exception.handler.class”. [AJS3] If I implemented a handler, and I set a non-default value for one of the new configuations, what happens? I would expect that the handler takes precedence. I wasn’t quite clear what “the control will follow the handler instructions” meant. [AJS4] Because you now have an enum for the RecordTooLargeExceptionResponse, I don’t think you need to state in the comment for ProducerExceptionHandler that RETRY will be interpreted as FAIL. Thanks, Andrew > On 13 May 2024, at 14:53, Alieh Saeedi <asae...@confluent.io.INVALID> wrote: > > Hi all, > > > Thanks for the very interesting discussion during my PTO. > > > KIP updates and addressing concerns: > > > 1) Two handle() methods are defined in ProducerExceptionHandler for the two > exceptions with different input parameters so that we have > handle(RecordTooLargeException e, ProducerRecord record) and > handle(UnknownTopicOrPartitionException e, ProducerRecord record) > > > 2) The ProducerExceptionHandler extends `Closable` as well. > > > 3) The KIP suggests having two more configuration parameters with boolean > values: > > - `drop.invalid.large.records` with a default value of `false` for > swallowing too large records. > > - `retry.unknown.topic.partition` with a default value of `true` that > performs RETRY for `max.block.ms` ms, encountering the > UnknownTopicOrPartitionException. > > > Hope the main concerns are addressed so that we can go forward with voting. > > > Cheers, > > Alieh > > On Thu, May 9, 2024 at 11:25 PM Artem Livshits > <alivsh...@confluent.io.invalid> wrote: > >> Hi Mathias, >> >>> [AL1] While I see the point, I would think having a different callback >> for every exception might not really be elegant? >> >> I'm not sure how to assess the level of elegance of the proposal, but I can >> comment on the technical characteristics: >> >> 1. Having specific interfaces that codify the logic that is currently >> prescribed in the comments reduce the chance of making a mistake. >> Commments may get ignored, misuderstood or etc. but if the contract is >> codified, the compilier will help to enforce the contract. >> 2. Given that the logic is trickier than it seems (the record-too-large is >> an example that can easily confuse someone who's not intimately familiar >> with the nuances of the batching logic), having a little more hoops to jump >> would give a greater chance that whoever tries to add a new cases pauses >> and thinks a bit more. >> 3. As Justine pointed out, having different method will be a forcing >> function to go through a KIP rather than smuggle new cases through >> implementation. >> 4. Sort of a consequence of the previous 3 -- all those things reduce the >> chance of someone writing the code that works with 2 errors and then when >> more errors are added in the future will suddenly incorrectly ignore new >> errors (the example I gave in the previous email). >> >>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as >> business logic. If a user puts a bad filter condition in their KS app, and >> drops messages >> >> I agree that there is always a chance to get a bug and lose messages, but >> there are generally separation of concerns that has different risk profile: >> the filtering logic may be more rigorously tested and rarely changed (say >> an application developer does it), but setting the topics to produce may be >> done via configuration (e.g. a user of the application does it) and it's >> generally an expectation that users would get an error when configuration >> is incorrect. >> >> What could be worse is that UnknownTopicOrPartitionException can be an >> intermittent error, i.e. with a generally correct configuration, there >> could be metadata propagation problem on the cluster and then a random set >> of records could get lost. >> >>> [AL3] Maybe I misunderstand what you are saying, but to me, checking the >> size of the record upfront is exactly what the KIP proposes? No? >> >> It achieves the same result but solves it differently, my proposal: >> >> 1. Application checks the validity of a record (maybe via a new >> validateRecord method) before producing it, and can just exclude it or >> return an error to the user. >> 2. Application produces the record -- at this point there are no records >> that could return record too large, they were either skipped at step 1 or >> we didn't get here because step 1 failed. >> >> Vs. KIP's proposal >> >> 1. Application produces the record. >> 2. Application gets a callback. >> 3. Application returns the action on how to proceed. >> >> The advantage of the former is the clarity of semantics -- the record is >> invalid (property of the record, not a function of server state or server >> configuration) and we can clearly know that it is the record that is bad >> and can never succeed. >> >> The KIP-proposed way actually has a very tricky point: it actually handles >> a subset of record-too-large exceptions. The broker can return >> record-too-large and reject the whole batch (but we don't want to ignore >> those because then we can skip random records that just happened to be in >> the same batch), in some sense we use the same error for 2 different >> conditions and understanding that requires pretty deep understanding of >> Kafka internals. >> >> -Artem >> >> >> On Wed, May 8, 2024 at 9:47 AM Justine Olshan <jols...@confluent.io.invalid >>> >> wrote: >> >>> My concern with respect to it being fragile: the code that ensures the >>> error type is internal to the producer. Someone may see it and say, I >> want >>> to add such and such error. This looks like internal code, so I don't >> need >>> a KIP, and then they can change it to whatever they want thinking it is >>> within the typical kafka improvement protocol. >>> >>> Relying on an internal change to enforce an external API is fragile in my >>> opinion. That's why I sort of agreed with Artem with enforcing the error >> in >>> the method signature -- part of the public API. >>> >>> Chris's comments on requiring more information to handler again makes me >>> wonder if we are solving a problem of lack of information at the >>> application level with a more powerful solution than we need. (Ie, if we >>> had more information, could the application close and restart the >>> transaction rather than having to drop records) But I am happy to >>> compromise with a handler that we can agree is sufficiently controlled >> and >>> documented. >>> >>> Justine >>> >>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton <chr...@aiven.io.invalid> >>> wrote: >>> >>>> Hi Alieh, >>>> >>>> Continuing prior discussions: >>>> >>>> 1) Regarding the "flexibility" discussion, my overarching point is >> that I >>>> don't see the point in allowing for this kind of pluggable logic >> without >>>> also covering more scenarios. Take example 2 in the KIP: if we're going >>> to >>>> implement retries only on "important" topics when a topic partition >> isn't >>>> found, why wouldn't we also want to be able to do this for other >> errors? >>>> Again, taking authorization errors as an example, why wouldn't we want >> to >>>> be able to fail when we can't write to "important" topics because the >>>> producer principal lacks sufficient ACLs, and drop the record if the >>> topic >>>> isn't "important"? In a security-conscious environment with >>>> runtime-dependent topic routing (which is a common feature of many >> source >>>> connectors, such as the Debezium connectors), this seems fairly likely. >>>> >>>> 2) As far as changing the shape of the API goes, I like Artem's idea of >>>> splitting out the interface based on specific exceptions. This may be a >>>> little laborious to expand in the future, but if we really want to >>>> limit the exceptions that we cover with the handler and move slowly and >>>> cautiously, then IMO it'd be reasonable to reflect that in the >>> interface. I >>>> also acknowledge that there's no way to completely prevent people from >>>> shooting themselves in the foot by implementing the API incorrectly, >> but >>> I >>>> think it's worth it to do what we can--including leveraging the Java >>>> language's type system--to help them, so IMO there's value to >> eliminating >>>> the implicit behavior of failing when a policy returns RETRY for a >>>> non-retriable error. This can take a variety of shapes and I'm not >> going >>> to >>>> insist on anything specific, but I do want to again raise my concerns >>> with >>>> the current proposal and request that we find something a little >> better. >>>> >>>> 3) Concerning the default implementation--actually, I meant what I >> wrote >>> :) >>>> I don't want a "second" default, I want an implementation of this >>> interface >>>> to be used as the default if no others are specified. The behavior of >>> this >>>> default implementation would be identical to existing behavior (so >> there >>>> would be no backwards compatibility concerns like the ones raised by >>>> Matthias), but it would be possible to configure this default handler >>> class >>>> to behave differently for a basic set of scenarios. This would mirror >>> (pun >>>> intended) the approach we've taken with Mirror Maker 2 and its >>>> ReplicationPolicy interface [1]. There is a default implementation >>>> available [2] that recognizes a handful of basic configuration >> properties >>>> [3] for simple tweaks, but if users want, they can also implement their >>> own >>>> replication policy for more fine-grained logic if those properties >> aren't >>>> flexible enough. >>>> >>>> More concretely, I'm imagining something like this for the producer >>>> exception handler: >>>> >>>> - Default implementation class >>>> of org.apache.kafka.clients.producer.DefaultProducerExceptionHandler >>>> - This class would recognize two properties: >>>> - drop.invalid.large.records: Boolean property, defaults to false. If >>>> "false", then causes the handler to return FAIL whenever >>>> a RecordTooLargeException is encountered; if "true", then causes >>>> SWALLOW/SKIP/DROP to be returned instead >>>> - unknown.topic.partition.retry.timeout.ms: Integer property, >> defaults >>>> to >>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is encountered, >>>> causes the handler to return FAIL if that record has been pending for >>> more >>>> than the retry timeout; otherwise, causes RETRY to be returned >>>> >>>> I think this is worth addressing now instead of later because it forces >>> us >>>> to evaluate the usefulness of this interface and it addresses a >>>> long-standing issue not just with Kafka Connect, but with the Java >>> producer >>>> in general. For reference, here are a few tickets I collected after >>> briefly >>>> skimming our Jira showing that this is a real pain point for users: >>>> https://issues.apache.org/jira/browse/KAFKA-10340, >>>> https://issues.apache.org/jira/browse/KAFKA-12990, >>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although this is >>>> frequently reported with Kafka Connect, it applies to anyone who >>> configures >>>> a producer to use a high retry timeout. I am aware of the max.block.ms >>>> property, but it's painful and IMO poor behavior to require users to >>> reduce >>>> the value of this property just to handle the single scenario when >> trying >>>> to write to topics that don't exist, since it would also limit the >> retry >>>> timeout for other operations that are legitimately retriable. >>>> >>>> Raising new points: >>>> >>>> 5) I don't see the interplay between this handler and existing >>>> retry-related properties mentioned anywhere in the KIP. I'm assuming >> that >>>> properties like "retries", "max.block.ms", and "delivery.timeout.ms" >>> would >>>> take precedence over the handler and once they are exhausted, the >>>> record/batch will fail no matter what? If so, it's probably worth >> briefly >>>> mentioning this (no more than a sentence or two) in the KIP, and if >> not, >>>> I'm curious what you have in mind. >>>> >>>> 6) I also wonder if the API provides enough information in its current >>>> form. Would it be possible to provide handlers with some way of >> tracking >>>> how long a record has been pending for (i.e., how long it's been since >>> the >>>> record was provided as an argument to Producer::send)? One way to do >> this >>>> could be to add a method like `onNewRecord(ProducerRecord)` and >>>> allow/require the handler to do its own bookkeeping, probably with a >>>> matching `onRecordSuccess(ProducerRecord)` method so that the handler >>>> doesn't eat up an ever-increasing amount of memory trying to track >>> records. >>>> An alternative could be to include information about the initial time >> the >>>> record was received by the producer and the number of retries that have >>>> been performed for it as parameters in the handle method(s), but I'm >> not >>>> sure how easy this would be to implement and it might clutter things >> up a >>>> bit too much. >>>> >>>> 7) A small request--can we add Closeable (or, if you prefer, >>> AutoCloseable) >>>> as a superinterface for the handler interface? >>>> >>>> [1] - >>>> >>>> >>> >> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html >>>> [2] - >>>> >>>> >>> >> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html >>>> [3] - >>>> >>>> >>> >> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG >>>> , >>>> >>>> >>> >> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG >>>> >>>> Cheers, >>>> >>>> Chris >>>> >>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <mj...@apache.org> >>> wrote: >>>> >>>>> Very interesting discussion. >>>>> >>>>> Seems a central point is the question about "how generic" we approach >>>>> this, and some people think we need to be conservative and others >> think >>>>> we should try to be as generic as possible. >>>>> >>>>> Personally, I think following a limited scope for this KIP (by >>>>> explicitly saying we only cover RecordTooLarge and >>>>> UnknownTopicOrPartition) might be better. We have concrete tickets >> that >>>>> we address, while for other exception (like authorization) we don't >>> know >>>>> if people want to handle it to begin with. Boiling the ocean might >> not >>>>> get us too far, and being somewhat pragmatic might help to move this >>> KIP >>>>> forward. -- I also agree with Justin and Artem, that we want to be >>>>> careful anyway to not allow users to break stuff too easily. >>>>> >>>>> As the same time, I agree that we should setup this change in a >> forward >>>>> looking way, and thus having a single generic handler allows us to >>> later >>>>> extend the handler more easily. This should also simplify follow up >> KIP >>>>> that might add new error cases (I actually mentioned one more to >> Alieh >>>>> already, but we both agreed that it might be best to exclude it from >>> the >>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP is not >>>>> the end of the world.) >>>>> >>>>> >>>>> >>>>> @Chris: >>>>> >>>>> (2) This sounds fair to me, but not sure how "bad" it actually would >>> be? >>>>> If the contract is clearly defined, it seems to be fine what the KIP >>>>> proposes, and given that such a handler is an expert API, and we can >>>>> provide "best practices" (cf my other comment below in [AL1]), being >> a >>>>> little bit pragmatic sound fine to me. >>>>> >>>>> Not sure if I understand Justin's argument on this question? >>>>> >>>>> >>>>> (3) About having a default impl or not. I am fine with adding one, >> even >>>>> if I am not sure why Connect could not just add its own one and plug >> it >>>>> in (and we would add corresponding configs for Connect, but not for >> the >>>>> producer itself)? For this case, we could also do this as a follow up >>>>> KIP, but happy to include it in this KIP to provide value to Connect >>>>> right away (even if the value might not come right away if we miss >> the >>>>> 3.8 deadline due to expanded KIP scope...) -- For KS, we would for >>> sure >>>>> plugin our own impl, and lock down the config such that users cannot >>> set >>>>> their own handler on the internal producer to begin with. Might be >> good >>>>> to elaborate why the producer should have a default? We might >> actually >>>>> want to add this to the KIP right away? >>>>> >>>>> The key for a default impl would be, to not change the current >>> behavior, >>>>> and having no default seems to achieve this. For the two cases you >>>>> mentioned, it's unclear to me what default value on "upper bound on >>>>> retires" for UnkownTopicOrPartitionException we should set? Seems it >>>>> would need to be the same as `delivery.timeout.ms`? However, if >> users >>>>> have `delivery.timeout.ms` actually overwritten we would need to set >>>>> this config somewhat "dynamic"? Is this feasible? If we hard-code 2 >>>>> minutes, it might not be backward compatible. I have the impression >> we >>>>> might introduce some undesired coupling? -- For the "record too >> large" >>>>> case, the config seems to be boolean and setting it to `false` by >>>>> default seems to provide backward compatibility. >>>>> >>>>> >>>>> >>>>> @Artem: >>>>> >>>>> [AL1] While I see the point, I would think having a different >> callback >>>>> for every exception might not really be elegant? In the end, the >>> handler >>>>> is an very advanced feature anyway, and if it's implemented in a bad >>>>> way, well, it's a user error -- we cannot protect users from >>> everything. >>>>> To me, a handler like this, is to some extend "business logic" and >> if a >>>>> user gets business logic wrong, it's hard to protect them. -- We >> would >>>>> of course provide best practice guidance in the JaveDocs, and explain >>>>> that a handler should have explicit `if` statements for stuff it want >>> to >>>>> handle, and only a single default which return FAIL. >>>>> >>>>> >>>>> [AL2] Yes, but for KS we would retry at the application layer. Ie, >> the >>>>> TX is not completed, we clean up and setup out task from scratch, to >>>>> ensure the pending transaction is completed before we resume. If the >> TX >>>>> was indeed aborted, we would retry from older offset and thus just >> hit >>>>> the same error again and the loop begins again. >>>>> >>>>> >>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as >>>>> business logic. If a user puts a bad filter condition in their KS >> app, >>>>> and drops messages, it nothing we can do about it, and this handler >>>>> IMHO, has a similar purpose. This is also the line of thinking I >> apply >>>>> to EOS, to address Justin's concern about "should we allow to drop >> for >>>>> EOS", and my answer is "yes", because it's more business logic than >>>>> actual error handling IMHO. And by default, we fail... So users >> opt-in >>>>> to add business logic to drop records. It's an application level >>>>> decision how to write the code. >>>>> >>>>> >>>>> [AL3] Maybe I misunderstand what you are saying, but to me, checking >>> the >>>>> size of the record upfront is exactly what the KIP proposes? No? >>>>> >>>>> >>>>> >>>>> @Justin: >>>>> >>>>>> I saw the sample >>>>>> code -- is it just an if statement checking for the error before >> the >>>>>> handler is invoked? That seems a bit fragile. >>>>> >>>>> What do you mean by fragile? Not sure if I see your point. >>>>> >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 5/7/24 5:33 PM, Artem Livshits wrote: >>>>>> Hi Alieh, >>>>>> >>>>>> Thanks for the KIP. The motivation talks about very specific >> cases, >>>> but >>>>>> the interface is generic. >>>>>> >>>>>> [AL1] >>>>>> If the interface evolves in the future I think we could have the >>>>> following >>>>>> confusion: >>>>>> >>>>>> 1. A user implemented SWALLOW action for both >> RecordTooLargeException >>>> and >>>>>> UnknownTopicOrPartitionException. For simpicity they just return >>>> SWALLOW >>>>>> from the function, because it elegantly handles all known cases. >>>>>> 2. The interface has evolved to support a new exception. >>>>>> 3. The user has upgraded their Kafka client. >>>>>> >>>>>> Now a new kind of error gets dropped on the floor without user's >>>>> intention >>>>>> and it would be super hard to detect and debug. >>>>>> >>>>>> To avoid the confusion, I think we should use handlers for specific >>>>>> exceptions. Then if a new exception is added it won't get silently >>>>> swalled >>>>>> because the user would need to add new functionality to handle it. >>>>>> >>>>>> I also have some higher level comments: >>>>>> >>>>>> [AL2] >>>>>>> it throws a TimeoutException, and the user can only blindly retry, >>>> which >>>>>> may result in an infinite retry loop >>>>>> >>>>>> If the TimeoutException happens during transactional processing >>>> (exactly >>>>>> once is the desired sematnics), then the client should not retry >> when >>>> it >>>>>> gets TimeoutException because without knowing the reason for >>>>>> TimeoutExceptions, the client cannot know whether the message got >>>>> actually >>>>>> produced or not and retrying the message may result in duplicatees. >>>>>> >>>>>>> The thrown TimeoutException "cuts" the connection to the >> underlying >>>> root >>>>>> cause of missing metadata >>>>>> >>>>>> Maybe we should fix the error handling and return the proper >>> underlying >>>>>> message? Then the application can properly handle the message >> based >>> on >>>>>> preferences. >>>>>> >>>>>> From the product perspective, it's not clear how safe it is to >>> blindly >>>>>> ignore UnknownTopicOrPartitionException. This could lead to >>> situations >>>>>> when a simple typo could lead to massive data loss (part of the >> data >>>>> would >>>>>> effectively be produced to a "black hole" and the user may not >> notice >>>> it >>>>>> for a while). >>>>>> >>>>>> In which situations would you recommend the user to "black hole" >>>> messages >>>>>> in case of misconfiguration? >>>>>> >>>>>> [AL3] >>>>>> >>>>>>> If the custom handler decides on SWALLOW for >>> RecordTooLargeException, >>>>>> >>>>>> Is it my understanding that this KIP proposes that functionality >> that >>>>> would >>>>>> only be able to SWALLOW RecordTooLargeException that happen because >>> the >>>>>> producer cannot produce the record (if the broker rejects the >> batch, >>>> the >>>>>> error won't get to the handler, because we cannot know which other >>>>> records >>>>>> get ignored). In this case, why not just check the locally >>> configured >>>>> max >>>>>> record size upfront and not produce the recrord in the first place? >>>>> Maybe >>>>>> we can expose a validation function from the producer that could >>>> validate >>>>>> the records locally, so we don't need to produce the record in >> order >>> to >>>>>> know that it's invalid. >>>>>> >>>>>> -Artem >>>>>> >>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan >>>>> <jols...@confluent.io.invalid> >>>>>> wrote: >>>>>> >>>>>>> Alieh and Chris, >>>>>>> >>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I just >>>> didn't >>>>>>> understand how that would be ensured on the producer side. I saw >> the >>>>> sample >>>>>>> code -- is it just an if statement checking for the error before >> the >>>>>>> handler is invoked? That seems a bit fragile. >>>>>>> >>>>>>> Can you clarify what you mean by `since the code does not reach >> the >>> KS >>>>>>> interface and breaks somewhere in producer.` If we surfaced this >>> error >>>>> to >>>>>>> the application in a better way would that also be a solution to >> the >>>>> issue? >>>>>>> >>>>>>> Justine >>>>>>> >>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi >>>>> <asae...@confluent.io.invalid> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> >>>>>>>> Thank you, Chris and Justine, for the feedback. >>>>>>>> >>>>>>>> >>>>>>>> @Chris >>>>>>>> >>>>>>>> 1) Flexibility: it has two meanings. The first meaning is the one >>> you >>>>>>>> mentioned. We are going to cover more exceptions in the future, >> but >>>> as >>>>>>>> Justine mentioned, we must be very conservative about adding more >>>>>>>> exceptions. Additionally, flexibility mainly means that the user >> is >>>>> able >>>>>>> to >>>>>>>> develop their own code. As mentioned in the motivation section >> and >>>> the >>>>>>>> examples, sometimes the user decides on dropping a record based >> on >>>> the >>>>>>>> topic, for example. >>>>>>>> >>>>>>>> >>>>>>>> 2) Defining two separate methods for retriable and non-retriable >>>>>>>> exceptions: although the idea is brilliant, the user may still >>> make a >>>>>>>> mistake by implementing the wrong method and see a non-expecting >>>>>>> behaviour. >>>>>>>> For example, he may implement handleRetriable() for >>>>>>> RecordTooLargeException >>>>>>>> and define SWALLOW for the exception, but in practice, he sees no >>>>> change >>>>>>> in >>>>>>>> default behaviour since he implemented the wrong method. I think >> we >>>> can >>>>>>>> never reduce the user’s mistakes to 0. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 3) Default implementation for Handler: the default behaviour is >>>> already >>>>>>>> preserved with NO need of implementing any handler or setting the >>>>>>>> corresponding config parameter `custom.exception.handler`. What >> you >>>>> mean >>>>>>> is >>>>>>>> actually having a second default, which requires having both >>>> interface >>>>>>> and >>>>>>>> config parameters. About UnknownTopicOrPartitionException: the >>>> producer >>>>>>>> already offers the config parameter `max.block.ms` which >>> determines >>>>> the >>>>>>>> duration of retrying. The main purpose of the user who needs the >>>>> handler >>>>>>> is >>>>>>>> to get the root cause of TimeoutException and handle it in the >> way >>> he >>>>>>>> intends. The KIP explains the necessity of it for KS users. >>>>>>>> >>>>>>>> >>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the error, >>>> while >>>>>>>> SKIP means skip the record, I think. If it makes sense for more >>> ppl, >>>> I >>>>>>> can >>>>>>>> change it to SKIP >>>>>>>> >>>>>>>> >>>>>>>> @Justine >>>>>>>> >>>>>>>> 1) was addressed by Chris. >>>>>>>> >>>>>>>> 2 and 3) The problem is exactly what you mentioned. Currently, >>> there >>>> is >>>>>>> no >>>>>>>> way to handle these issues application-side. Even KS users who >>>>> implement >>>>>>> KS >>>>>>>> ProductionExceptionHandler are not able to handle the exceptions >> as >>>>> they >>>>>>>> intend since the code does not reach the KS interface and breaks >>>>>>> somewhere >>>>>>>> in producer. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Alieh >>>>>>>> >>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton < >>>> fearthecel...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Justine, >>>>>>>>> >>>>>>>>> The method signatures for the interface are indeed open-ended, >> but >>>> the >>>>>>>> KIP >>>>>>>>> states that its uses will be limited. See the motivation >> section: >>>>>>>>> >>>>>>>>>> We believe that the user should be able to develop custom >>> exception >>>>>>>>> handlers for managing producer exceptions. On the other hand, >> this >>>>> will >>>>>>>> be >>>>>>>>> an expert-level API, and using that may result in strange >>> behaviour >>>> in >>>>>>>> the >>>>>>>>> system, making it hard to find the root cause. Therefore, the >>> custom >>>>>>>>> handler is currently limited to handling RecordTooLargeException >>> and >>>>>>>>> UnknownTopicOrPartitionException. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Chris >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan >>>>> <jols...@confluent.io.invalid >>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Alieh, >>>>>>>>>> >>>>>>>>>> I was out for KSB and then was also sick. :( >>>>>>>>>> >>>>>>>>>> To your point 1) Chris, I don't think it is limited to two >>> specific >>>>>>>>>> scenarios, since the interface accepts a generic Exception e >> and >>>> can >>>>>>> be >>>>>>>>>> implemented to check if that e is an instanceof any exception. >> I >>>>>>> didn't >>>>>>>>> see >>>>>>>>>> anywhere that specific errors are enforced. I'm a bit concerned >>>> about >>>>>>>>> this >>>>>>>>>> actually. I'm concerned about the opened-endedness and the >>> contract >>>>>>> we >>>>>>>>> have >>>>>>>>>> with transactions. We are allowing the client to make decisions >>>> that >>>>>>>> are >>>>>>>>>> somewhat invisible to the server. As an aside, can we build in >>> log >>>>>>>>> messages >>>>>>>>>> when the handler decides to skip etc a message. I'm really >>>> concerned >>>>>>>>> about >>>>>>>>>> messages being silently dropped. >>>>>>>>>> >>>>>>>>>> I do think Chris's point 2) about retriable vs non retriable >>> errors >>>>>>> is >>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic or >>>> partition >>>>>>>>>> exception too early, as there are cases where it can be >>> transient. >>>>>>>>>> >>>>>>>>>> I'm still a little bit wary of allowing dropping records as >> part >>> of >>>>>>> EOS >>>>>>>>>> generally as in many cases, these errors signify an issue with >>> the >>>>>>>>> original >>>>>>>>>> data. I understand that streams and connect/mirror maker may >> have >>>>>>>> reasons >>>>>>>>>> they want to progress past these messages, but wondering if >> there >>>> is >>>>>>> a >>>>>>>>> way >>>>>>>>>> that can be done application-side. I'm willing to accept this >>> sort >>>> of >>>>>>>>>> proposal if we can make it clear that this sort of thing is >>>> happening >>>>>>>> and >>>>>>>>>> we limit the blast radius for what we can do. >>>>>>>>>> >>>>>>>>>> Justine >>>>>>>>>> >>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton >>>> <chr...@aiven.io.invalid >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Alieh, >>>>>>>>>>> >>>>>>>>>>> Sorry for the delay, I've been out sick. I still have some >>>> thoughts >>>>>>>>> that >>>>>>>>>>> I'd like to see addressed before voting. >>>>>>>>>>> >>>>>>>>>>> 1) If flexibility is the motivation for a pluggable interface, >>> why >>>>>>>> are >>>>>>>>> we >>>>>>>>>>> only limiting the uses for this interface to two very specific >>>>>>>>> scenarios? >>>>>>>>>>> Why not also allow, e.g., authorization errors to be handled >> as >>>>>>> well >>>>>>>>>>> (allowing users to drop records destined for some off-limits >>>>>>> topics, >>>>>>>> or >>>>>>>>>>> retry for a limited duration in case there's a delay in the >>>>>>>> propagation >>>>>>>>>> of >>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other >> errors >>>>>>> that >>>>>>>>>> could >>>>>>>>>>> be handled with this new API, both to avoid the follow-up work >>> of >>>>>>>>> another >>>>>>>>>>> KIP to address them in the future, and to make sure that we're >>> not >>>>>>>>>> painting >>>>>>>>>>> ourselves into a corner with the current API in a way that >> would >>>>>>> make >>>>>>>>>>> future modifications difficult. >>>>>>>>>>> >>>>>>>>>>> 2) Something feels a bit off with how retriable vs. >>> non-retriable >>>>>>>>> errors >>>>>>>>>>> are handled with the interface. Why not introduce two separate >>>>>>>> methods >>>>>>>>> to >>>>>>>>>>> handle each case separately? That way there's no ambiguity or >>>>>>>> implicit >>>>>>>>>>> behavior when, e.g., attempting to retry on a >>>>>>>> RecordTooLargeException. >>>>>>>>>> This >>>>>>>>>>> could be something like `NonRetriableResponse >>>>>>> handle(ProducerRecord, >>>>>>>>>>> Exception)` and `RetriableResponse >>> handleRetriable(ProducerRecord, >>>>>>>>>>> Exception)`, though the exact names and shape can obviously be >>>>>>> toyed >>>>>>>>>> with a >>>>>>>>>>> bit. >>>>>>>>>>> >>>>>>>>>>> 3) Although the flexibility of a pluggable interface may >> benefit >>>>>>> some >>>>>>>>>>> users' custom producer applications and Kafka Streams >>>> applications, >>>>>>>> it >>>>>>>>>>> comes at significant deployment cost for other low-/no-code >>>>>>>>> environments, >>>>>>>>>>> including but not limited to Kafka Connect and MirrorMaker 2. >>> Can >>>>>>> we >>>>>>>>> add >>>>>>>>>> a >>>>>>>>>>> default implementation of the exception handler that allows >> for >>>>>>> some >>>>>>>>>> simple >>>>>>>>>>> behavior to be tweaked via configuration property? Two things >>> that >>>>>>>>> would >>>>>>>>>> be >>>>>>>>>>> nice to have would be A) an upper bound on the retry time for >>>>>>>>>>> unknown-topic-partition exceptions and B) an option to drop >>>> records >>>>>>>>> that >>>>>>>>>>> are large enough to trigger a record-too-large exception. >>>>>>>>>>> >>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of the >>>> proposed >>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious, >>> especially >>>>>>>> when >>>>>>>>>>> trying to guess the behavior for retriable errors. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> >>>>>>>>>>> Chris >>>>>>>>>>> >>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi >>>>>>>>>> <asae...@confluent.io.invalid >>>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi all, >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> A summary of the KIP and the discussions: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> The KIP introduces a handler interface for Producer in order >> to >>>>>>>>> handle >>>>>>>>>>> two >>>>>>>>>>>> exceptions: RecordTooLargeException and >>>>>>>>>> UnknownTopicOrPartitionException. >>>>>>>>>>>> The handler handles the exceptions per-record. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> - Do we need this handler? [Motivation and Examples >> sections] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the producer >>>>>>> collects >>>>>>>>>>> multiple >>>>>>>>>>>> records in batches. Then a RecordTooLargeException related >> to a >>>>>>>>> single >>>>>>>>>>>> record leads to failing the entire batch. A custom exception >>>>>>>> handler >>>>>>>>> in >>>>>>>>>>>> this case may decide on dropping the record and continuing >> the >>>>>>>>>>> processing. >>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a >> record >>>>>>>> that >>>>>>>>> is >>>>>>>>>>> too >>>>>>>>>>>> large is a poison pill record, and there is no way to skip >> over >>>>>>>> it. A >>>>>>>>>>>> handler would allow us to react to this error inside the >>>>>>> producer, >>>>>>>>>> i.e., >>>>>>>>>>>> local to where the error happens, and thus simplify the >> overall >>>>>>>> code >>>>>>>>>>>> significantly. Please read the Motivation section for more >>>>>>>>> explanation. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the producer >>>>>>>> handles >>>>>>>>>>> this >>>>>>>>>>>> exception internally and only issues a WARN log about missing >>>>>>>>> metadata >>>>>>>>>>> and >>>>>>>>>>>> retries internally. Later, when the producer hits " >>>>>>>>> deliver.timeout.ms" >>>>>>>>>>> it >>>>>>>>>>>> throws a TimeoutException, and the user can only blindly >> retry, >>>>>>>> which >>>>>>>>>> may >>>>>>>>>>>> result in an infinite retry loop. The thrown TimeoutException >>>>>>>> "cuts" >>>>>>>>>> the >>>>>>>>>>>> connection to the underlying root cause of missing metadata >>>>>>> (which >>>>>>>>>> could >>>>>>>>>>>> indeed be a transient error but is persistent for a >>> non-existing >>>>>>>>>> topic). >>>>>>>>>>>> Thus, there is no programmatic way to break the infinite >> retry >>>>>>>> loop. >>>>>>>>>>> Kafka >>>>>>>>>>>> Streams also blindly retries for this case, and the >> application >>>>>>>> gets >>>>>>>>>>> stuck. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> - Having interface vs configuration option: [Motivation, >>>>>>> Examples, >>>>>>>>> and >>>>>>>>>>>> Rejected Alternatives sections] >>>>>>>>>>>> >>>>>>>>>>>> Our solution is introducing an interface due to the full >>>>>>>> flexibility >>>>>>>>>> that >>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams ones, >>>>>>>> determine >>>>>>>>>> the >>>>>>>>>>>> handler's behaviour based on the situation. For example, f >>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may want >> to >>>>>>>> raise >>>>>>>>> an >>>>>>>>>>>> error for some topics but retry it for other topics. Having a >>>>>>>>>>> configuration >>>>>>>>>>>> option with a fixed set of possibilities does not serve the >>>>>>> user's >>>>>>>>>>>> needs. See Example 2, please. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces >> section] >>>>>>>>>>>> >>>>>>>>>>>> If the custom handler decides on SWALLOW for >>>>>>>> RecordTooLargeException, >>>>>>>>>>> then >>>>>>>>>>>> this record will not be a part of the batch of transactions >> and >>>>>>>> will >>>>>>>>>> also >>>>>>>>>>>> not be sent to the broker in non-transactional mode. So no >>>>>>> worries >>>>>>>>>> about >>>>>>>>>>>> getting a RecordTooLargeException from the broker in this >> case, >>>>>>> as >>>>>>>>> the >>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW means >>> drop >>>>>>>> the >>>>>>>>>>> record >>>>>>>>>>>> and continue/swallow the error. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> - What if the handle() method implements RETRY for >>>>>>>>>>> RecordTooLargeException? >>>>>>>>>>>> [Proposed Changes section] >>>>>>>>>>>> >>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW for >>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal to >> FAIL. >>>>>>>> This >>>>>>>>> is >>>>>>>>>>>> well documented/informed in javadoc. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> - What if the handle() method of the handler throws an >>> exception? >>>>>>>>>>>> >>>>>>>>>>>> The handler is expected to have correct code. If it throws an >>>>>>>>>> exception, >>>>>>>>>>>> everything fails. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This is a PoC PR <https://github.com/apache/kafka/pull/15846 >>> >>>>>>> ONLY >>>>>>>>> for >>>>>>>>>>>> RecordTooLargeException. The code changes related to >>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this PR >>> LATER. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Looking forward to your feedback again. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> >>>>>>>>>>>> Alieh >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True < >> k...@kirktrue.pro> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the updates! >>>>>>>>>>>>> >>>>>>>>>>>>> Comments inline... >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi >>>>>>>>>>> <asae...@confluent.io.INVALID >>>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks! >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Addressing some of the main concerns: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by broker, >>>>>>>> producer >>>>>>>>>> and >>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler` >> interface >>>>>>>> is >>>>>>>>>>>>> introduced >>>>>>>>>>>>>> to affect only the exceptions thrown from the producer. >> This >>>>>>>> KIP >>>>>>>>>> very >>>>>>>>>>>>>> specifically means to provide a possibility to manage the >>>>>>>>>>>>>> `RecordTooLargeException` thrown from the Producer.send() >>>>>>>> method. >>>>>>>>>>>> Please >>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I >>>>>>> investigated >>>>>>>>> the >>>>>>>>>>>> issue >>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern about >> how >>>>>>>> we >>>>>>>>>>> handle >>>>>>>>>>>>> the >>>>>>>>>>>>>> errors as well. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are called >>>>>>>> when >>>>>>>>>> the >>>>>>>>>>>>> record >>>>>>>>>>>>>> sent to the server is acknowledged, while this is not the >>>>>>>> desired >>>>>>>>>>> time >>>>>>>>>>>>> for >>>>>>>>>>>>>> all exceptions. We intend to handle exceptions beforehand. >>>>>>>>>>>>> >>>>>>>>>>>>> I guess it makes sense to keep the expectation for when >>>>>>> Callback >>>>>>>> is >>>>>>>>>>>>> invoked as-is vs. shoehorning more into it. >>>>>>>>>>>>> >>>>>>>>>>>>>> - What if the custom handler returns RETRY for >>>>>>>>>>>>> `RecordTooLargeException`? I >>>>>>>>>>>>>> assume changing the producer configuration at runtime is >>>>>>>>> possible. >>>>>>>>>> If >>>>>>>>>>>> so, >>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in the >>>>>>> next >>>>>>>>>> try, >>>>>>>>>>>> the >>>>>>>>>>>>>> too large record is not poisoning any more. I am not 100% >>>>>>> sure >>>>>>>>>> about >>>>>>>>>>>> the >>>>>>>>>>>>>> technical details, though. Otherwise, we can consider the >>>>>>> RETRY >>>>>>>>> as >>>>>>>>>>> FAIL >>>>>>>>>>>>> for >>>>>>>>>>>>>> this exception. Another solution would be to consider a >>>>>>>> constant >>>>>>>>>>> number >>>>>>>>>>>>> of >>>>>>>>>>>>>> times for RETRY which can be useful for other exceptions as >>>>>>>> well. >>>>>>>>>>>>> >>>>>>>>>>>>> It’s not presently possible to change the configuration of >> an >>>>>>>>>> existing >>>>>>>>>>>>> Producer at runtime. So if a record hits a >>>>>>>> RecordTooLargeException >>>>>>>>>>> once, >>>>>>>>>>>> no >>>>>>>>>>>>> amount of retrying (with the current Producer) will change >>> that >>>>>>>>> fact. >>>>>>>>>>> So >>>>>>>>>>>>> I’m still a little stuck on how to handle a response of >> RETRY >>>>>>> for >>>>>>>>> an >>>>>>>>>>>>> “oversized” record. >>>>>>>>>>>>> >>>>>>>>>>>>>> - What if the handle() method itself throws an exception? I >>>>>>>> think >>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be exactly >>>>>>>> like >>>>>>>>>> when >>>>>>>>>>>> no >>>>>>>>>>>>>> custom handler is defined since the user actually did not >>>>>>> have >>>>>>>> a >>>>>>>>>>>> working >>>>>>>>>>>>>> handler. >>>>>>>>>>>>> >>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is the >> right >>>>>>>>>> choice. >>>>>>>>>>> It >>>>>>>>>>>>> then becomes a silent failure that might have repercussions, >>>>>>>>>> depending >>>>>>>>>>> on >>>>>>>>>>>>> the business logic. A user would have to proactively trawls >>>>>>>> through >>>>>>>>>> the >>>>>>>>>>>>> logs for WARN/ERROR messages to catch it. >>>>>>>>>>>>> >>>>>>>>>>>>> Throwing a hard error is pretty draconian, though… >>>>>>>>>>>>> >>>>>>>>>>>>>> - Why not use config parameters instead of an interface? As >>>>>>>>>> explained >>>>>>>>>>>> in >>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that the >>>>>>> handler >>>>>>>>>> will >>>>>>>>>>> be >>>>>>>>>>>>>> used for a greater number of exceptions in the future. >>>>>>>> Defining a >>>>>>>>>>>>>> configuration parameter for each exception may make the >>>>>>>>>>> configuration a >>>>>>>>>>>>> bit >>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility. >>>>>>>>>>>>> >>>>>>>>>>>>> Agreed that the logic-via-configuration approach is weird >> and >>>>>>>>>> limiting. >>>>>>>>>>>>> Forget I ever suggested it ;) >>>>>>>>>>>>> >>>>>>>>>>>>> I’d think additional background in the Motivation section >>> would >>>>>>>>> help >>>>>>>>>> me >>>>>>>>>>>>> understand how users might use this feature beyond a) >> skipping >>>>>>>>>>>> “oversized” >>>>>>>>>>>>> records, and b) not retrying missing topics. >>>>>>>>>>>>> >>>>>>>>>>>>>> Small change: >>>>>>>>>>>>>> >>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for brevity >>>>>>> and >>>>>>>>>>>>> simplicity. >>>>>>>>>>>>>> Could’ve been HandlerResponse too I think! >>>>>>>>>>>>> >>>>>>>>>>>>> The name change sounds good to me. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks Alieh! >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I thank you all again for your useful >> questions/suggestions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I would be happy to hear more of your concerns, as stated >> in >>>>>>>> some >>>>>>>>>>>>> feedback. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Alieh >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan >>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks Alieh for the updates. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm a little concerned about the design pattern here. It >>>>>>> seems >>>>>>>>>> like >>>>>>>>>>> we >>>>>>>>>>>>> want >>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic >>>>>>> handler. >>>>>>>>>>>>>>> I think we tried to narrow down on the specific errors we >>>>>>> want >>>>>>>>> to >>>>>>>>>>>>> handle, >>>>>>>>>>>>>>> but it feels a little clunky as we have a generic thing >> for >>>>>>>> two >>>>>>>>>>>> specific >>>>>>>>>>>>>>> errors. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to solve >>>>>>>> these >>>>>>>>>>>>> problems. I >>>>>>>>>>>>>>> agree though that we will need something more than the >> error >>>>>>>>>> classes >>>>>>>>>>>> I'm >>>>>>>>>>>>>>> proposing if we want to have different handling be >>>>>>>> configurable. >>>>>>>>>>>>>>> My concern is that the open-endedness of a handler means >>>>>>> that >>>>>>>> we >>>>>>>>>> are >>>>>>>>>>>>>>> creating more problems than we are solving. It is still >>>>>>>> unclear >>>>>>>>> to >>>>>>>>>>> me >>>>>>>>>>>>> how >>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could include >> an >>>>>>>>>> example? >>>>>>>>>>>> It >>>>>>>>>>>>>>> seems like there is a specific use case in mind and maybe >> we >>>>>>>> can >>>>>>>>>>> make >>>>>>>>>>>> a >>>>>>>>>>>>>>> design that is tighter and supports that case. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Justine >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True < >>>>>>> k...@kirktrue.pro> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the KIP! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> A few questions: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> K1. What is the expected behavior for the producer if it >>>>>>>>>> generates >>>>>>>>>>> a >>>>>>>>>>>>>>>> RecordTooLargeException, but the handler returns RETRY? >>>>>>>>>>>>>>>> K2. How do we determine which Record was responsible for >>>>>>> the >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException since we get that >> response >>>>>>>>> when >>>>>>>>>>>>>>> sending a >>>>>>>>>>>>>>>> batch of records? >>>>>>>>>>>>>>>> K3. What is the expected behavior if the handle() method >>>>>>>> itself >>>>>>>>>>>> throws >>>>>>>>>>>>> an >>>>>>>>>>>>>>>> error? >>>>>>>>>>>>>>>> K4. What is the downside of adding an onError() method to >>>>>>> the >>>>>>>>>>>>> Producer’s >>>>>>>>>>>>>>>> Callback interface vs. a new mechanism? >>>>>>>>>>>>>>>> K5. Can we change “ProducerExceptionHandlerResponse" to >>>>>>> just >>>>>>>>>>>> “Response” >>>>>>>>>>>>>>>> given that it’s an inner enum? >>>>>>>>>>>>>>>> K6. Any recommendation for callback authors to handle >>>>>>>> different >>>>>>>>>>>>> behavior >>>>>>>>>>>>>>>> for different topics? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I’ll echo what others have said, it would help me >>>>>>> understand >>>>>>>>> why >>>>>>>>>> we >>>>>>>>>>>>> want >>>>>>>>>>>>>>>> another handler class if there were more examples in the >>>>>>>>>> Motivation >>>>>>>>>>>>>>>> section. As it stands now, I agree with Chris that the >>>>>>> stated >>>>>>>>>>> issues >>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>> be solved by adding two new configuration options: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> oversized.record.behavior=fail >>>>>>>>>>>>>>>> retry.on.unknown.topic.or.partition=true >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What I’m not yet able to wrap my head around is: what >>>>>>> exactly >>>>>>>>>> would >>>>>>>>>>>> the >>>>>>>>>>>>>>>> logic in the handler be? I’m not very imaginative, so I’m >>>>>>>>>> assuming >>>>>>>>>>>>> they’d >>>>>>>>>>>>>>>> mostly be if-this-then-that. However, if they’re more >>>>>>>>>> complicated, >>>>>>>>>>>> I’d >>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>> other concerns. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Kirk >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi >>>>>>>>>>>>> <asae...@confluent.io.INVALID >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thank you all for the feedback! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Addressing the main concern: The KIP is about giving the >>>>>>>> user >>>>>>>>>> the >>>>>>>>>>>>>>> ability >>>>>>>>>>>>>>>>> to handle producer exceptions, but to be more >> conservative >>>>>>>> and >>>>>>>>>>> avoid >>>>>>>>>>>>>>>> future >>>>>>>>>>>>>>>>> issues, we decided to be limited to a short list of >>>>>>>>> exceptions. >>>>>>>>>> I >>>>>>>>>>>>>>>> included >>>>>>>>>>>>>>>>> *RecordTooLargeExceptin* and >>>>>>>>> *UnknownTopicOrPartitionException. >>>>>>>>>>>> *Open >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> suggestion for adding some more ;-) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> KIP Updates: >>>>>>>>>>>>>>>>> - clarified the way that the user should configure the >>>>>>>>> Producer >>>>>>>>>> to >>>>>>>>>>>> use >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> custom handler. I think adding a producer config >> property >>>>>>> is >>>>>>>>> the >>>>>>>>>>>>>>> cleanest >>>>>>>>>>>>>>>>> one. >>>>>>>>>>>>>>>>> - changed the *ClientExceptionHandler* to >>>>>>>>>>> *ProducerExceptionHandler* >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> closer to what we are changing. >>>>>>>>>>>>>>>>> - added the ProducerRecord as the input parameter of the >>>>>>>>>> handle() >>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> well. >>>>>>>>>>>>>>>>> - increased the response types to 3 to have fail and two >>>>>>>> types >>>>>>>>>> of >>>>>>>>>>>>>>>> continue. >>>>>>>>>>>>>>>>> - The default behaviour is having no custom handler, >>>>>>> having >>>>>>>>> the >>>>>>>>>>>>>>>>> corresponding config parameter set to null. Therefore, >> the >>>>>>>> KIP >>>>>>>>>>>>> provides >>>>>>>>>>>>>>>> no >>>>>>>>>>>>>>>>> default implementation of the interface. >>>>>>>>>>>>>>>>> - We follow the interface solution as described in the >>>>>>>>>>>>>>>>> Rejected Alternetives section. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Alieh >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax < >>>>>>>>>> mj...@apache.org >>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh! It addresses an important >> case >>>>>>>> for >>>>>>>>>>> error >>>>>>>>>>>>>>>>>> handling. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I agree that using this handler would be an expert API, >>>>>>> as >>>>>>>>>>>> mentioned >>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>> a few people. But I don't think it would be a reason to >>>>>>> not >>>>>>>>> add >>>>>>>>>>> it. >>>>>>>>>>>>>>> It's >>>>>>>>>>>>>>>>>> always a tricky tradeoff what to expose to users and to >>>>>>>> avoid >>>>>>>>>>> foot >>>>>>>>>>>>>>> guns, >>>>>>>>>>>>>>>>>> but we added similar handlers to Kafka Streams, and >> have >>>>>>>> good >>>>>>>>>>>>>>> experience >>>>>>>>>>>>>>>>>> with it. Hence, I understand, but don't share the >> concern >>>>>>>>>> raised. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I also agree that there is some responsibility by the >>>>>>> user >>>>>>>> to >>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>>> how such a handler should be implemented to not drop >> data >>>>>>>> by >>>>>>>>>>>>> accident. >>>>>>>>>>>>>>>>>> But it seem unavoidable and acceptable. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> While I understand that a "simpler / reduced" API (eg >> via >>>>>>>>>>> configs) >>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>> also work, I personally prefer a full handler. Configs >>>>>>> have >>>>>>>>> the >>>>>>>>>>>> same >>>>>>>>>>>>>>>>>> issue that they could be miss-used potentially leading >> to >>>>>>>>>>>> incorrectly >>>>>>>>>>>>>>>>>> dropped data, but at the same time are less flexible >> (and >>>>>>>>> thus >>>>>>>>>>>> maybe >>>>>>>>>>>>>>>>>> ever harder to use correctly...?). Base on my >> experience, >>>>>>>>> there >>>>>>>>>>> is >>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>> often weird corner case for which it make sense to also >>>>>>>> drop >>>>>>>>>>>> records >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>> other exceptions, and a full handler has the advantage >> of >>>>>>>>> full >>>>>>>>>>>>>>>>>> flexibility and "absolute power!". >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> To be fair: I don't know the exact code paths of the >>>>>>>> producer >>>>>>>>>> in >>>>>>>>>>>>>>>>>> details, so please keep me honest. But my understanding >>>>>>> is, >>>>>>>>>> that >>>>>>>>>>>> the >>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>> aims to allow users to react to internal exception, and >>>>>>>>> decide >>>>>>>>>> to >>>>>>>>>>>>> keep >>>>>>>>>>>>>>>>>> retrying internally, swallow the error and drop the >>>>>>> record, >>>>>>>>> or >>>>>>>>>>>> raise >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> error? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Maybe the KIP would need to be a little bit more >> precises >>>>>>>>> what >>>>>>>>>>>> error >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> want to cover -- I don't think this list must be >>>>>>>> exhaustive, >>>>>>>>> as >>>>>>>>>>> we >>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> always do follow up KIP to also apply the handler to >>>>>>> other >>>>>>>>>> errors >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> expand the scope of the handler. The KIP does mention >>>>>>>>> examples, >>>>>>>>>>> but >>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>> might be good to explicitly state for what cases the >>>>>>>> handler >>>>>>>>>> gets >>>>>>>>>>>>>>>> applied? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I am also not sure if CONTINUE and FAIL are enough >>>>>>> options? >>>>>>>>>> Don't >>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> need three options? Or would `CONTINUE` have different >>>>>>>>> meaning >>>>>>>>>>>>>>> depending >>>>>>>>>>>>>>>>>> on the type of error? Ie, for a retryable error >>>>>>> `CONTINUE` >>>>>>>>>> would >>>>>>>>>>>> mean >>>>>>>>>>>>>>>>>> keep retrying internally, but for a non-retryable error >>>>>>>>>>> `CONTINUE` >>>>>>>>>>>>>>> means >>>>>>>>>>>>>>>>>> swallow the error and drop the record? This semantic >>>>>>>> overload >>>>>>>>>>> seems >>>>>>>>>>>>>>>>>> tricky to reason about by users, so it might better to >>>>>>>> split >>>>>>>>>>>>>>> `CONTINUE` >>>>>>>>>>>>>>>>>> into two cases -> `RETRY` and `SWALLOW` (or some better >>>>>>>>> names). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Additionally, should we just ship a >>>>>>>>>>> `DefaultClientExceptionHandler` >>>>>>>>>>>>>>>>>> which would return `FAIL`, for backward compatibility. >> Or >>>>>>>>> don't >>>>>>>>>>>> have >>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>> default handler to begin with and allow it to be >> `null`? >>>>>>> I >>>>>>>>>> don't >>>>>>>>>>>> see >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> need for a specific `TransactionExceptionHandler`. To >> me, >>>>>>>> the >>>>>>>>>>> goal >>>>>>>>>>>>>>>>>> should be to not modify the default behavior at all, >> but >>>>>>> to >>>>>>>>>> just >>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>> users to change the default behavior if there is a >> need. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What is missing on the KIP though it, how the handler >> is >>>>>>>>> passed >>>>>>>>>>>> into >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> producer thought? Would we need a new config which >> allows >>>>>>>> to >>>>>>>>>> set >>>>>>>>>>> a >>>>>>>>>>>>>>>>>> custom handler? And/or would we allow to pass in an >>>>>>>> instance >>>>>>>>>> via >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> constructor or add a new method to set a handler? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote: >>>>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Exception handling in the Kafka producer and consumer >> is >>>>>>>>>> really >>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> ideal. >>>>>>>>>>>>>>>>>>> It’s even harder working out what’s going on with the >>>>>>>>>> consumer. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I’m a bit nervous about this KIP and I agree with >> Chris >>>>>>>> that >>>>>>>>>> it >>>>>>>>>>>>> could >>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>> with additional >>>>>>>>>>>>>>>>>>> motivation. This would be an expert-level interface >>>>>>> given >>>>>>>>> how >>>>>>>>>>>>>>>> complicated >>>>>>>>>>>>>>>>>>> the exception handling for Kafka has become. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 7. The application is not really aware of the batching >>>>>>>> being >>>>>>>>>>> done >>>>>>>>>>>> on >>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>> behalf. >>>>>>>>>>>>>>>>>>> The ProduceResponse can actually return an array of >>>>>>>> records >>>>>>>>>>> which >>>>>>>>>>>>>>>> failed >>>>>>>>>>>>>>>>>>> per batch. If you get RecordTooLargeException, and >> want >>>>>>> to >>>>>>>>>>> retry, >>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>> probably >>>>>>>>>>>>>>>>>>> need to remove the offending records from the batch >> and >>>>>>>>> retry >>>>>>>>>>> it. >>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>> is getting fiddly. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 8. There is already o.a.k.clients.producer.Callback. I >>>>>>>>> wonder >>>>>>>>>>>>> whether >>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>> alternative might be to add a method to the existing >>>>>>>>> Callback >>>>>>>>>>>>>>>> interface, >>>>>>>>>>>>>>>>>> such as: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ClientExceptionResponse onException(Exception >>>>>>> exception) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It would be called when a ProduceResponse contains an >>>>>>>> error, >>>>>>>>>> but >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> producer is going to retry. It tells the producer >>>>>>> whether >>>>>>>> to >>>>>>>>>> go >>>>>>>>>>>>> ahead >>>>>>>>>>>>>>>>>> with the retry >>>>>>>>>>>>>>>>>>> or not. The default implementation would be to >> CONTINUE, >>>>>>>>>> because >>>>>>>>>>>>>>> that’s >>>>>>>>>>>>>>>>>>> just continuing to retry as planned. Note that this >> is a >>>>>>>>>>>> per-record >>>>>>>>>>>>>>>>>> callback, so >>>>>>>>>>>>>>>>>>> the application would be able to understand which >>>>>>> records >>>>>>>>>>> failed. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> By using an existing interface, we already know how to >>>>>>>>>> configure >>>>>>>>>>>> it >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> we know >>>>>>>>>>>>>>>>>>> about the threading model for calling it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton >>>>>>>>>>> <chr...@aiven.io.INVALID >>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP! The issue with writing to >>>>>>>> non-existent >>>>>>>>>>> topics >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> particularly frustrating for users of Kafka Connect >> and >>>>>>>> has >>>>>>>>>>> been >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>> of a handful of Jira tickets over the years. My >>>>>>> thoughts: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. An additional detail we can add to the motivation >>>>>>> (or >>>>>>>>>>> possibly >>>>>>>>>>>>>>>>>> rejected >>>>>>>>>>>>>>>>>>>> alternatives) section is that this kind of custom >> retry >>>>>>>>> logic >>>>>>>>>>>> can't >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> implemented by hand by, e.g., setting retries to 0 in >>>>>>> the >>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>> config >>>>>>>>>>>>>>>>>>>> and handling exceptions at the application level. Or >>>>>>>>> rather, >>>>>>>>>> it >>>>>>>>>>>>> can, >>>>>>>>>>>>>>>>>> but 1) >>>>>>>>>>>>>>>>>>>> it's a bit painful to have to reimplement at every >>>>>>>>> call-site >>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> Producer::send (and any code that awaits the returned >>>>>>>>> Future) >>>>>>>>>>> and >>>>>>>>>>>>> 2) >>>>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>>>>>> impossible to do this without losing idempotency on >>>>>>>>> retries. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 2. That said, I wonder if a pluggable interface is >>>>>>> really >>>>>>>>> the >>>>>>>>>>>> right >>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>>> here. Documenting the interactions of a producer with >>>>>>>>>>>>>>>>>>>> a ClientExceptionHandler instance will be tricky, and >>>>>>>>>>>> implementing >>>>>>>>>>>>>>>> them >>>>>>>>>>>>>>>>>>>> will also be a fair amount of work. I believe that >>>>>>> there >>>>>>>>>> needs >>>>>>>>>>> to >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>> more granularity for how writes to non-existent >> topics >>>>>>>> (or >>>>>>>>>>>> really, >>>>>>>>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from >> the >>>>>>>>>> broker) >>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> handled, >>>>>>>>>>>>>>>>>>>> but I'm torn between keeping it simple with maybe one >>>>>>> or >>>>>>>>> two >>>>>>>>>>> new >>>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>> config properties, or a full-blown pluggable >> interface. >>>>>>>> If >>>>>>>>>>> there >>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>> cases that would benefit from a pluggable interface, >> it >>>>>>>>> would >>>>>>>>>>> be >>>>>>>>>>>>>>> nice >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> identify these and add them to the KIP to strengthen >>>>>>> the >>>>>>>>>>>>> motivation. >>>>>>>>>>>>>>>>>> Right >>>>>>>>>>>>>>>>>>>> now, I'm not sure the two that are mentioned in the >>>>>>>>>> motivation >>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>> sufficient. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3. Alternatively, a possible compromise is for this >> KIP >>>>>>>> to >>>>>>>>>>>>> introduce >>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> properties that dictate how to handle >>>>>>>>> unknown-topic-partition >>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> record-too-large errors, with the thinking that if we >>>>>>>>>>> introduce a >>>>>>>>>>>>>>>>>> pluggable >>>>>>>>>>>>>>>>>>>> interface later on, these properties will be >> recognized >>>>>>>> by >>>>>>>>>> the >>>>>>>>>>>>>>> default >>>>>>>>>>>>>>>>>>>> implementation of that interface but could be >>>>>>> completely >>>>>>>>>>> ignored >>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>> replaced by alternative implementations. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 4. (Nit) You can remove the "This page is meant as a >>>>>>>>> template >>>>>>>>>>> for >>>>>>>>>>>>>>>>>> writing a >>>>>>>>>>>>>>>>>>>> KIP..." part from the KIP. It's not a template >> anymore >>>>>>> :) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 5. If we do go the pluggable interface route, >> wouldn't >>>>>>> we >>>>>>>>>> want >>>>>>>>>>> to >>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> possibility for retry logic? The simplest version of >>>>>>> this >>>>>>>>>> could >>>>>>>>>>>> be >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> add a >>>>>>>>>>>>>>>>>>>> RETRY value to the ClientExceptionHandlerResponse >> enum. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead >> of >>>>>>>>>>>> "CONTINUE" >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> the ClientExceptionHandlerResponse enum, since they >>>>>>> cause >>>>>>>>>>> records >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> dropped. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Chris >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hey Alieh, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I echo what Omnia says, I'm not sure I understand >> the >>>>>>>>>>>> implications >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> change and I think more detail is needed. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This comment also confused me a bit: >>>>>>>>>>>>>>>>>>>>> * {@code ClientExceptionHandler} that continues the >>>>>>>>>>> transaction >>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>> if a >>>>>>>>>>>>>>>>>>>>> record is too large. >>>>>>>>>>>>>>>>>>>>> * Otherwise, it makes the transaction to fail. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Relatedly, I've been working with some folks on a >> KIP >>>>>>>> for >>>>>>>>>>>>>>>> transactions >>>>>>>>>>>>>>>>>>>>> errors and how they are handled. Specifically for >> the >>>>>>>>>>>>>>>>>>>>> RecordTooLargeException (and a few other errors), we >>>>>>>> want >>>>>>>>> to >>>>>>>>>>>> give >>>>>>>>>>>>> a >>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>> error category for this error that allows the >>>>>>>> application >>>>>>>>> to >>>>>>>>>>>>> choose >>>>>>>>>>>>>>>>>> how it >>>>>>>>>>>>>>>>>>>>> is handled. Maybe this KIP is something that you are >>>>>>>>> looking >>>>>>>>>>>> for? >>>>>>>>>>>>>>>> Stay >>>>>>>>>>>>>>>>>>>>> tuned :) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Justine >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim < >>>>>>>>>>>>>>>> o.g.h.ibra...@gmail.com >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I have couple of comments >>>>>>>>>>>>>>>>>>>>>> - You mentioned in the KIP motivation, >>>>>>>>>>>>>>>>>>>>>>> Another example for which a production exception >>>>>>>> handler >>>>>>>>>>> could >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>> is if a user tries to write into a non-existing >>>>>>> topic, >>>>>>>>>> which >>>>>>>>>>>>>>>> returns a >>>>>>>>>>>>>>>>>>>>>> retryable error code; with infinite retries, the >>>>>>>> producer >>>>>>>>>>> would >>>>>>>>>>>>>>> hang >>>>>>>>>>>>>>>>>>>>>> retrying forever. A handler could help to break the >>>>>>>>>> infinite >>>>>>>>>>>>> retry >>>>>>>>>>>>>>>>>> loop. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> How the handler can differentiate between something >>>>>>>> that >>>>>>>>> is >>>>>>>>>>>>>>>> temporary >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> it should keep retrying and something permanent >> like >>>>>>>>> forgot >>>>>>>>>>> to >>>>>>>>>>>>>>>> create >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> topic? temporary here could be >>>>>>>>>>>>>>>>>>>>>> the producer get deployed before the topic creation >>>>>>>>> finish >>>>>>>>>>>>>>>> (specially >>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>> the topic creation is handled via IaC) >>>>>>>>>>>>>>>>>>>>>> temporary offline partitions >>>>>>>>>>>>>>>>>>>>>> leadership changing >>>>>>>>>>>>>>>>>>>>>> Isn’t this putting the producer at risk of >>>>>>>> dropping >>>>>>>>>>>> records >>>>>>>>>>>>>>>>>>>>>> unintentionally? >>>>>>>>>>>>>>>>>>>>>> - Can you elaborate more on what is written in the >>>>>>>>>>>> compatibility >>>>>>>>>>>>> / >>>>>>>>>>>>>>>>>>>>>> migration plan section please by explaining in bit >>>>>>> more >>>>>>>>>>> details >>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> changing behaviour and how this will impact client >>>>>>> who >>>>>>>>> are >>>>>>>>>>>>>>>> upgrading? >>>>>>>>>>>>>>>>>>>>>> - In the proposal changes can you elaborate in the >>>>>>> KIP >>>>>>>>>> where >>>>>>>>>>> in >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> producer lifecycle will ClientExceptionHandler and >>>>>>>>>>>>>>>>>>>>>> TransactionExceptionHandler get triggered, and how >>>>>>> will >>>>>>>>> the >>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>> configure them to point to costumed implementation. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>> Omnia >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi >>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to >>>>>>>>>> Producer. >>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I look forward to your feedback! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> Alieh