I'm not sure I agree with the Retriable and NonRetriableResponse comment. This doesn't limit the blast radius or enforce certain errors are used. I think we might disagree on how controlled these interfaces can be...
Justine On Mon, May 13, 2024 at 8:40 AM Chris Egerton <chr...@aiven.io.invalid> wrote: > Hi Alieh, > > Thanks for the updates! I just have a few more thoughts: > > - I don't think a boolean property is sufficient to dictate retries for > unknown topic partitions, though. These errors can occur if a topic has > just been created, which can occur if, for example, automatic topic > creation is enabled for a multi-task connector. This is why I proposed a > timeout instead of a boolean (and see my previous email for why reducing > max.block.ms for a producer is not a viable alternative). If it helps, one > way to reproduce this yourself is to add the line > `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here: > > https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134 > and then check the logs afterward for messages like "Error while fetching > metadata with correlation id <n> : {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}". > > - I also don't think we need custom XxxResponse enums for every possible > method; it seems like this will lead to a lot of duplication and cognitive > overhead if we want to expand the error handler in the future. Something > more flexible like RetriableResponse and NonRetriableResponse could > suffice. > > - Finally, the KIP still doesn't state how the handler will or won't take > precedence over existing retry properties. If I set `retries` or ` > delivery.timeout.ms` or `max.block.ms` to low values, will that cause > retries to cease even if my custom handler would otherwise keep returning > RETRY for an error? > > Cheers, > > Chris > > On Mon, May 13, 2024 at 11:02 AM Andrew Schofield < > andrew_schofi...@live.com> > wrote: > > > Hi Alieh, > > Just a few more comments on the KIP. It is looking much less risky now > the > > scope > > is tighter. > > > > [AJS1] It would be nice to have default implementations of the handle > > methods > > so an implementor would not need to implement both themselves. > > > > [AJS2] Producer configurations which are class names usually end in > > “.class”. > > I suggest “custom.exception.handler.class”. > > > > [AJS3] If I implemented a handler, and I set a non-default value for one > > of the > > new configuations, what happens? I would expect that the handler takes > > precedence. I wasn’t quite clear what “the control will follow the > handler > > instructions” meant. > > > > [AJS4] Because you now have an enum for the > > RecordTooLargeExceptionResponse, > > I don’t think you need to state in the comment for > > ProducerExceptionHandler that > > RETRY will be interpreted as FAIL. > > > > Thanks, > > Andrew > > > > > On 13 May 2024, at 14:53, Alieh Saeedi <asae...@confluent.io.INVALID> > > wrote: > > > > > > Hi all, > > > > > > > > > Thanks for the very interesting discussion during my PTO. > > > > > > > > > KIP updates and addressing concerns: > > > > > > > > > 1) Two handle() methods are defined in ProducerExceptionHandler for the > > two > > > exceptions with different input parameters so that we have > > > handle(RecordTooLargeException e, ProducerRecord record) and > > > handle(UnknownTopicOrPartitionException e, ProducerRecord record) > > > > > > > > > 2) The ProducerExceptionHandler extends `Closable` as well. > > > > > > > > > 3) The KIP suggests having two more configuration parameters with > boolean > > > values: > > > > > > - `drop.invalid.large.records` with a default value of `false` for > > > swallowing too large records. > > > > > > - `retry.unknown.topic.partition` with a default value of `true` that > > > performs RETRY for `max.block.ms` ms, encountering the > > > UnknownTopicOrPartitionException. > > > > > > > > > Hope the main concerns are addressed so that we can go forward with > > voting. > > > > > > > > > Cheers, > > > > > > Alieh > > > > > > On Thu, May 9, 2024 at 11:25 PM Artem Livshits > > > <alivsh...@confluent.io.invalid> wrote: > > > > > >> Hi Mathias, > > >> > > >>> [AL1] While I see the point, I would think having a different > callback > > >> for every exception might not really be elegant? > > >> > > >> I'm not sure how to assess the level of elegance of the proposal, but > I > > can > > >> comment on the technical characteristics: > > >> > > >> 1. Having specific interfaces that codify the logic that is currently > > >> prescribed in the comments reduce the chance of making a mistake. > > >> Commments may get ignored, misuderstood or etc. but if the contract is > > >> codified, the compilier will help to enforce the contract. > > >> 2. Given that the logic is trickier than it seems (the > record-too-large > > is > > >> an example that can easily confuse someone who's not intimately > familiar > > >> with the nuances of the batching logic), having a little more hoops to > > jump > > >> would give a greater chance that whoever tries to add a new cases > pauses > > >> and thinks a bit more. > > >> 3. As Justine pointed out, having different method will be a forcing > > >> function to go through a KIP rather than smuggle new cases through > > >> implementation. > > >> 4. Sort of a consequence of the previous 3 -- all those things reduce > > the > > >> chance of someone writing the code that works with 2 errors and then > > when > > >> more errors are added in the future will suddenly incorrectly ignore > new > > >> errors (the example I gave in the previous email). > > >> > > >>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as > > >> business logic. If a user puts a bad filter condition in their KS app, > > and > > >> drops messages > > >> > > >> I agree that there is always a chance to get a bug and lose messages, > > but > > >> there are generally separation of concerns that has different risk > > profile: > > >> the filtering logic may be more rigorously tested and rarely changed > > (say > > >> an application developer does it), but setting the topics to produce > > may be > > >> done via configuration (e.g. a user of the application does it) and > it's > > >> generally an expectation that users would get an error when > > configuration > > >> is incorrect. > > >> > > >> What could be worse is that UnknownTopicOrPartitionException can be an > > >> intermittent error, i.e. with a generally correct configuration, there > > >> could be metadata propagation problem on the cluster and then a random > > set > > >> of records could get lost. > > >> > > >>> [AL3] Maybe I misunderstand what you are saying, but to me, checking > > the > > >> size of the record upfront is exactly what the KIP proposes? No? > > >> > > >> It achieves the same result but solves it differently, my proposal: > > >> > > >> 1. Application checks the validity of a record (maybe via a new > > >> validateRecord method) before producing it, and can just exclude it or > > >> return an error to the user. > > >> 2. Application produces the record -- at this point there are no > records > > >> that could return record too large, they were either skipped at step 1 > > or > > >> we didn't get here because step 1 failed. > > >> > > >> Vs. KIP's proposal > > >> > > >> 1. Application produces the record. > > >> 2. Application gets a callback. > > >> 3. Application returns the action on how to proceed. > > >> > > >> The advantage of the former is the clarity of semantics -- the record > is > > >> invalid (property of the record, not a function of server state or > > server > > >> configuration) and we can clearly know that it is the record that is > bad > > >> and can never succeed. > > >> > > >> The KIP-proposed way actually has a very tricky point: it actually > > handles > > >> a subset of record-too-large exceptions. The broker can return > > >> record-too-large and reject the whole batch (but we don't want to > ignore > > >> those because then we can skip random records that just happened to be > > in > > >> the same batch), in some sense we use the same error for 2 different > > >> conditions and understanding that requires pretty deep understanding > of > > >> Kafka internals. > > >> > > >> -Artem > > >> > > >> > > >> On Wed, May 8, 2024 at 9:47 AM Justine Olshan > > <jols...@confluent.io.invalid > > >>> > > >> wrote: > > >> > > >>> My concern with respect to it being fragile: the code that ensures > the > > >>> error type is internal to the producer. Someone may see it and say, I > > >> want > > >>> to add such and such error. This looks like internal code, so I don't > > >> need > > >>> a KIP, and then they can change it to whatever they want thinking it > is > > >>> within the typical kafka improvement protocol. > > >>> > > >>> Relying on an internal change to enforce an external API is fragile > in > > my > > >>> opinion. That's why I sort of agreed with Artem with enforcing the > > error > > >> in > > >>> the method signature -- part of the public API. > > >>> > > >>> Chris's comments on requiring more information to handler again makes > > me > > >>> wonder if we are solving a problem of lack of information at the > > >>> application level with a more powerful solution than we need. (Ie, if > > we > > >>> had more information, could the application close and restart the > > >>> transaction rather than having to drop records) But I am happy to > > >>> compromise with a handler that we can agree is sufficiently > controlled > > >> and > > >>> documented. > > >>> > > >>> Justine > > >>> > > >>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton <chr...@aiven.io.invalid > > > > >>> wrote: > > >>> > > >>>> Hi Alieh, > > >>>> > > >>>> Continuing prior discussions: > > >>>> > > >>>> 1) Regarding the "flexibility" discussion, my overarching point is > > >> that I > > >>>> don't see the point in allowing for this kind of pluggable logic > > >> without > > >>>> also covering more scenarios. Take example 2 in the KIP: if we're > > going > > >>> to > > >>>> implement retries only on "important" topics when a topic partition > > >> isn't > > >>>> found, why wouldn't we also want to be able to do this for other > > >> errors? > > >>>> Again, taking authorization errors as an example, why wouldn't we > want > > >> to > > >>>> be able to fail when we can't write to "important" topics because > the > > >>>> producer principal lacks sufficient ACLs, and drop the record if the > > >>> topic > > >>>> isn't "important"? In a security-conscious environment with > > >>>> runtime-dependent topic routing (which is a common feature of many > > >> source > > >>>> connectors, such as the Debezium connectors), this seems fairly > > likely. > > >>>> > > >>>> 2) As far as changing the shape of the API goes, I like Artem's idea > > of > > >>>> splitting out the interface based on specific exceptions. This may > be > > a > > >>>> little laborious to expand in the future, but if we really want to > > >>>> limit the exceptions that we cover with the handler and move slowly > > and > > >>>> cautiously, then IMO it'd be reasonable to reflect that in the > > >>> interface. I > > >>>> also acknowledge that there's no way to completely prevent people > from > > >>>> shooting themselves in the foot by implementing the API incorrectly, > > >> but > > >>> I > > >>>> think it's worth it to do what we can--including leveraging the Java > > >>>> language's type system--to help them, so IMO there's value to > > >> eliminating > > >>>> the implicit behavior of failing when a policy returns RETRY for a > > >>>> non-retriable error. This can take a variety of shapes and I'm not > > >> going > > >>> to > > >>>> insist on anything specific, but I do want to again raise my > concerns > > >>> with > > >>>> the current proposal and request that we find something a little > > >> better. > > >>>> > > >>>> 3) Concerning the default implementation--actually, I meant what I > > >> wrote > > >>> :) > > >>>> I don't want a "second" default, I want an implementation of this > > >>> interface > > >>>> to be used as the default if no others are specified. The behavior > of > > >>> this > > >>>> default implementation would be identical to existing behavior (so > > >> there > > >>>> would be no backwards compatibility concerns like the ones raised by > > >>>> Matthias), but it would be possible to configure this default > handler > > >>> class > > >>>> to behave differently for a basic set of scenarios. This would > mirror > > >>> (pun > > >>>> intended) the approach we've taken with Mirror Maker 2 and its > > >>>> ReplicationPolicy interface [1]. There is a default implementation > > >>>> available [2] that recognizes a handful of basic configuration > > >> properties > > >>>> [3] for simple tweaks, but if users want, they can also implement > > their > > >>> own > > >>>> replication policy for more fine-grained logic if those properties > > >> aren't > > >>>> flexible enough. > > >>>> > > >>>> More concretely, I'm imagining something like this for the producer > > >>>> exception handler: > > >>>> > > >>>> - Default implementation class > > >>>> of org.apache.kafka.clients.producer.DefaultProducerExceptionHandler > > >>>> - This class would recognize two properties: > > >>>> - drop.invalid.large.records: Boolean property, defaults to false. > If > > >>>> "false", then causes the handler to return FAIL whenever > > >>>> a RecordTooLargeException is encountered; if "true", then causes > > >>>> SWALLOW/SKIP/DROP to be returned instead > > >>>> - unknown.topic.partition.retry.timeout.ms: Integer property, > > >> defaults > > >>>> to > > >>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is > encountered, > > >>>> causes the handler to return FAIL if that record has been pending > for > > >>> more > > >>>> than the retry timeout; otherwise, causes RETRY to be returned > > >>>> > > >>>> I think this is worth addressing now instead of later because it > > forces > > >>> us > > >>>> to evaluate the usefulness of this interface and it addresses a > > >>>> long-standing issue not just with Kafka Connect, but with the Java > > >>> producer > > >>>> in general. For reference, here are a few tickets I collected after > > >>> briefly > > >>>> skimming our Jira showing that this is a real pain point for users: > > >>>> https://issues.apache.org/jira/browse/KAFKA-10340, > > >>>> https://issues.apache.org/jira/browse/KAFKA-12990, > > >>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although this is > > >>>> frequently reported with Kafka Connect, it applies to anyone who > > >>> configures > > >>>> a producer to use a high retry timeout. I am aware of the > > max.block.ms > > >>>> property, but it's painful and IMO poor behavior to require users to > > >>> reduce > > >>>> the value of this property just to handle the single scenario when > > >> trying > > >>>> to write to topics that don't exist, since it would also limit the > > >> retry > > >>>> timeout for other operations that are legitimately retriable. > > >>>> > > >>>> Raising new points: > > >>>> > > >>>> 5) I don't see the interplay between this handler and existing > > >>>> retry-related properties mentioned anywhere in the KIP. I'm assuming > > >> that > > >>>> properties like "retries", "max.block.ms", and "delivery.timeout.ms > " > > >>> would > > >>>> take precedence over the handler and once they are exhausted, the > > >>>> record/batch will fail no matter what? If so, it's probably worth > > >> briefly > > >>>> mentioning this (no more than a sentence or two) in the KIP, and if > > >> not, > > >>>> I'm curious what you have in mind. > > >>>> > > >>>> 6) I also wonder if the API provides enough information in its > current > > >>>> form. Would it be possible to provide handlers with some way of > > >> tracking > > >>>> how long a record has been pending for (i.e., how long it's been > since > > >>> the > > >>>> record was provided as an argument to Producer::send)? One way to do > > >> this > > >>>> could be to add a method like `onNewRecord(ProducerRecord)` and > > >>>> allow/require the handler to do its own bookkeeping, probably with a > > >>>> matching `onRecordSuccess(ProducerRecord)` method so that the > handler > > >>>> doesn't eat up an ever-increasing amount of memory trying to track > > >>> records. > > >>>> An alternative could be to include information about the initial > time > > >> the > > >>>> record was received by the producer and the number of retries that > > have > > >>>> been performed for it as parameters in the handle method(s), but I'm > > >> not > > >>>> sure how easy this would be to implement and it might clutter things > > >> up a > > >>>> bit too much. > > >>>> > > >>>> 7) A small request--can we add Closeable (or, if you prefer, > > >>> AutoCloseable) > > >>>> as a superinterface for the handler interface? > > >>>> > > >>>> [1] - > > >>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html > > >>>> [2] - > > >>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html > > >>>> [3] - > > >>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG > > >>>> , > > >>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG > > >>>> > > >>>> Cheers, > > >>>> > > >>>> Chris > > >>>> > > >>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <mj...@apache.org> > > >>> wrote: > > >>>> > > >>>>> Very interesting discussion. > > >>>>> > > >>>>> Seems a central point is the question about "how generic" we > approach > > >>>>> this, and some people think we need to be conservative and others > > >> think > > >>>>> we should try to be as generic as possible. > > >>>>> > > >>>>> Personally, I think following a limited scope for this KIP (by > > >>>>> explicitly saying we only cover RecordTooLarge and > > >>>>> UnknownTopicOrPartition) might be better. We have concrete tickets > > >> that > > >>>>> we address, while for other exception (like authorization) we don't > > >>> know > > >>>>> if people want to handle it to begin with. Boiling the ocean might > > >> not > > >>>>> get us too far, and being somewhat pragmatic might help to move > this > > >>> KIP > > >>>>> forward. -- I also agree with Justin and Artem, that we want to be > > >>>>> careful anyway to not allow users to break stuff too easily. > > >>>>> > > >>>>> As the same time, I agree that we should setup this change in a > > >> forward > > >>>>> looking way, and thus having a single generic handler allows us to > > >>> later > > >>>>> extend the handler more easily. This should also simplify follow up > > >> KIP > > >>>>> that might add new error cases (I actually mentioned one more to > > >> Alieh > > >>>>> already, but we both agreed that it might be best to exclude it > from > > >>> the > > >>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP is > not > > >>>>> the end of the world.) > > >>>>> > > >>>>> > > >>>>> > > >>>>> @Chris: > > >>>>> > > >>>>> (2) This sounds fair to me, but not sure how "bad" it actually > would > > >>> be? > > >>>>> If the contract is clearly defined, it seems to be fine what the > KIP > > >>>>> proposes, and given that such a handler is an expert API, and we > can > > >>>>> provide "best practices" (cf my other comment below in [AL1]), > being > > >> a > > >>>>> little bit pragmatic sound fine to me. > > >>>>> > > >>>>> Not sure if I understand Justin's argument on this question? > > >>>>> > > >>>>> > > >>>>> (3) About having a default impl or not. I am fine with adding one, > > >> even > > >>>>> if I am not sure why Connect could not just add its own one and > plug > > >> it > > >>>>> in (and we would add corresponding configs for Connect, but not for > > >> the > > >>>>> producer itself)? For this case, we could also do this as a follow > up > > >>>>> KIP, but happy to include it in this KIP to provide value to > Connect > > >>>>> right away (even if the value might not come right away if we miss > > >> the > > >>>>> 3.8 deadline due to expanded KIP scope...) -- For KS, we would for > > >>> sure > > >>>>> plugin our own impl, and lock down the config such that users > cannot > > >>> set > > >>>>> their own handler on the internal producer to begin with. Might be > > >> good > > >>>>> to elaborate why the producer should have a default? We might > > >> actually > > >>>>> want to add this to the KIP right away? > > >>>>> > > >>>>> The key for a default impl would be, to not change the current > > >>> behavior, > > >>>>> and having no default seems to achieve this. For the two cases you > > >>>>> mentioned, it's unclear to me what default value on "upper bound on > > >>>>> retires" for UnkownTopicOrPartitionException we should set? Seems > it > > >>>>> would need to be the same as `delivery.timeout.ms`? However, if > > >> users > > >>>>> have `delivery.timeout.ms` actually overwritten we would need to > set > > >>>>> this config somewhat "dynamic"? Is this feasible? If we hard-code 2 > > >>>>> minutes, it might not be backward compatible. I have the impression > > >> we > > >>>>> might introduce some undesired coupling? -- For the "record too > > >> large" > > >>>>> case, the config seems to be boolean and setting it to `false` by > > >>>>> default seems to provide backward compatibility. > > >>>>> > > >>>>> > > >>>>> > > >>>>> @Artem: > > >>>>> > > >>>>> [AL1] While I see the point, I would think having a different > > >> callback > > >>>>> for every exception might not really be elegant? In the end, the > > >>> handler > > >>>>> is an very advanced feature anyway, and if it's implemented in a > bad > > >>>>> way, well, it's a user error -- we cannot protect users from > > >>> everything. > > >>>>> To me, a handler like this, is to some extend "business logic" and > > >> if a > > >>>>> user gets business logic wrong, it's hard to protect them. -- We > > >> would > > >>>>> of course provide best practice guidance in the JaveDocs, and > explain > > >>>>> that a handler should have explicit `if` statements for stuff it > want > > >>> to > > >>>>> handle, and only a single default which return FAIL. > > >>>>> > > >>>>> > > >>>>> [AL2] Yes, but for KS we would retry at the application layer. Ie, > > >> the > > >>>>> TX is not completed, we clean up and setup out task from scratch, > to > > >>>>> ensure the pending transaction is completed before we resume. If > the > > >> TX > > >>>>> was indeed aborted, we would retry from older offset and thus just > > >> hit > > >>>>> the same error again and the loop begins again. > > >>>>> > > >>>>> > > >>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as > > >>>>> business logic. If a user puts a bad filter condition in their KS > > >> app, > > >>>>> and drops messages, it nothing we can do about it, and this handler > > >>>>> IMHO, has a similar purpose. This is also the line of thinking I > > >> apply > > >>>>> to EOS, to address Justin's concern about "should we allow to drop > > >> for > > >>>>> EOS", and my answer is "yes", because it's more business logic than > > >>>>> actual error handling IMHO. And by default, we fail... So users > > >> opt-in > > >>>>> to add business logic to drop records. It's an application level > > >>>>> decision how to write the code. > > >>>>> > > >>>>> > > >>>>> [AL3] Maybe I misunderstand what you are saying, but to me, > checking > > >>> the > > >>>>> size of the record upfront is exactly what the KIP proposes? No? > > >>>>> > > >>>>> > > >>>>> > > >>>>> @Justin: > > >>>>> > > >>>>>> I saw the sample > > >>>>>> code -- is it just an if statement checking for the error before > > >> the > > >>>>>> handler is invoked? That seems a bit fragile. > > >>>>> > > >>>>> What do you mean by fragile? Not sure if I see your point. > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> On 5/7/24 5:33 PM, Artem Livshits wrote: > > >>>>>> Hi Alieh, > > >>>>>> > > >>>>>> Thanks for the KIP. The motivation talks about very specific > > >> cases, > > >>>> but > > >>>>>> the interface is generic. > > >>>>>> > > >>>>>> [AL1] > > >>>>>> If the interface evolves in the future I think we could have the > > >>>>> following > > >>>>>> confusion: > > >>>>>> > > >>>>>> 1. A user implemented SWALLOW action for both > > >> RecordTooLargeException > > >>>> and > > >>>>>> UnknownTopicOrPartitionException. For simpicity they just return > > >>>> SWALLOW > > >>>>>> from the function, because it elegantly handles all known cases. > > >>>>>> 2. The interface has evolved to support a new exception. > > >>>>>> 3. The user has upgraded their Kafka client. > > >>>>>> > > >>>>>> Now a new kind of error gets dropped on the floor without user's > > >>>>> intention > > >>>>>> and it would be super hard to detect and debug. > > >>>>>> > > >>>>>> To avoid the confusion, I think we should use handlers for > specific > > >>>>>> exceptions. Then if a new exception is added it won't get > silently > > >>>>> swalled > > >>>>>> because the user would need to add new functionality to handle it. > > >>>>>> > > >>>>>> I also have some higher level comments: > > >>>>>> > > >>>>>> [AL2] > > >>>>>>> it throws a TimeoutException, and the user can only blindly > retry, > > >>>> which > > >>>>>> may result in an infinite retry loop > > >>>>>> > > >>>>>> If the TimeoutException happens during transactional processing > > >>>> (exactly > > >>>>>> once is the desired sematnics), then the client should not retry > > >> when > > >>>> it > > >>>>>> gets TimeoutException because without knowing the reason for > > >>>>>> TimeoutExceptions, the client cannot know whether the message got > > >>>>> actually > > >>>>>> produced or not and retrying the message may result in > duplicatees. > > >>>>>> > > >>>>>>> The thrown TimeoutException "cuts" the connection to the > > >> underlying > > >>>> root > > >>>>>> cause of missing metadata > > >>>>>> > > >>>>>> Maybe we should fix the error handling and return the proper > > >>> underlying > > >>>>>> message? Then the application can properly handle the message > > >> based > > >>> on > > >>>>>> preferences. > > >>>>>> > > >>>>>> From the product perspective, it's not clear how safe it is to > > >>> blindly > > >>>>>> ignore UnknownTopicOrPartitionException. This could lead to > > >>> situations > > >>>>>> when a simple typo could lead to massive data loss (part of the > > >> data > > >>>>> would > > >>>>>> effectively be produced to a "black hole" and the user may not > > >> notice > > >>>> it > > >>>>>> for a while). > > >>>>>> > > >>>>>> In which situations would you recommend the user to "black hole" > > >>>> messages > > >>>>>> in case of misconfiguration? > > >>>>>> > > >>>>>> [AL3] > > >>>>>> > > >>>>>>> If the custom handler decides on SWALLOW for > > >>> RecordTooLargeException, > > >>>>>> > > >>>>>> Is it my understanding that this KIP proposes that functionality > > >> that > > >>>>> would > > >>>>>> only be able to SWALLOW RecordTooLargeException that happen > because > > >>> the > > >>>>>> producer cannot produce the record (if the broker rejects the > > >> batch, > > >>>> the > > >>>>>> error won't get to the handler, because we cannot know which other > > >>>>> records > > >>>>>> get ignored). In this case, why not just check the locally > > >>> configured > > >>>>> max > > >>>>>> record size upfront and not produce the recrord in the first > place? > > >>>>> Maybe > > >>>>>> we can expose a validation function from the producer that could > > >>>> validate > > >>>>>> the records locally, so we don't need to produce the record in > > >> order > > >>> to > > >>>>>> know that it's invalid. > > >>>>>> > > >>>>>> -Artem > > >>>>>> > > >>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan > > >>>>> <jols...@confluent.io.invalid> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Alieh and Chris, > > >>>>>>> > > >>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I just > > >>>> didn't > > >>>>>>> understand how that would be ensured on the producer side. I saw > > >> the > > >>>>> sample > > >>>>>>> code -- is it just an if statement checking for the error before > > >> the > > >>>>>>> handler is invoked? That seems a bit fragile. > > >>>>>>> > > >>>>>>> Can you clarify what you mean by `since the code does not reach > > >> the > > >>> KS > > >>>>>>> interface and breaks somewhere in producer.` If we surfaced this > > >>> error > > >>>>> to > > >>>>>>> the application in a better way would that also be a solution to > > >> the > > >>>>> issue? > > >>>>>>> > > >>>>>>> Justine > > >>>>>>> > > >>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi > > >>>>> <asae...@confluent.io.invalid> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi, > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Thank you, Chris and Justine, for the feedback. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> @Chris > > >>>>>>>> > > >>>>>>>> 1) Flexibility: it has two meanings. The first meaning is the > one > > >>> you > > >>>>>>>> mentioned. We are going to cover more exceptions in the future, > > >> but > > >>>> as > > >>>>>>>> Justine mentioned, we must be very conservative about adding > more > > >>>>>>>> exceptions. Additionally, flexibility mainly means that the user > > >> is > > >>>>> able > > >>>>>>> to > > >>>>>>>> develop their own code. As mentioned in the motivation section > > >> and > > >>>> the > > >>>>>>>> examples, sometimes the user decides on dropping a record based > > >> on > > >>>> the > > >>>>>>>> topic, for example. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 2) Defining two separate methods for retriable and non-retriable > > >>>>>>>> exceptions: although the idea is brilliant, the user may still > > >>> make a > > >>>>>>>> mistake by implementing the wrong method and see a non-expecting > > >>>>>>> behaviour. > > >>>>>>>> For example, he may implement handleRetriable() for > > >>>>>>> RecordTooLargeException > > >>>>>>>> and define SWALLOW for the exception, but in practice, he sees > no > > >>>>> change > > >>>>>>> in > > >>>>>>>> default behaviour since he implemented the wrong method. I think > > >> we > > >>>> can > > >>>>>>>> never reduce the user’s mistakes to 0. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 3) Default implementation for Handler: the default behaviour is > > >>>> already > > >>>>>>>> preserved with NO need of implementing any handler or setting > the > > >>>>>>>> corresponding config parameter `custom.exception.handler`. What > > >> you > > >>>>> mean > > >>>>>>> is > > >>>>>>>> actually having a second default, which requires having both > > >>>> interface > > >>>>>>> and > > >>>>>>>> config parameters. About UnknownTopicOrPartitionException: the > > >>>> producer > > >>>>>>>> already offers the config parameter `max.block.ms` which > > >>> determines > > >>>>> the > > >>>>>>>> duration of retrying. The main purpose of the user who needs the > > >>>>> handler > > >>>>>>> is > > >>>>>>>> to get the root cause of TimeoutException and handle it in the > > >> way > > >>> he > > >>>>>>>> intends. The KIP explains the necessity of it for KS users. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the > error, > > >>>> while > > >>>>>>>> SKIP means skip the record, I think. If it makes sense for more > > >>> ppl, > > >>>> I > > >>>>>>> can > > >>>>>>>> change it to SKIP > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> @Justine > > >>>>>>>> > > >>>>>>>> 1) was addressed by Chris. > > >>>>>>>> > > >>>>>>>> 2 and 3) The problem is exactly what you mentioned. Currently, > > >>> there > > >>>> is > > >>>>>>> no > > >>>>>>>> way to handle these issues application-side. Even KS users who > > >>>>> implement > > >>>>>>> KS > > >>>>>>>> ProductionExceptionHandler are not able to handle the exceptions > > >> as > > >>>>> they > > >>>>>>>> intend since the code does not reach the KS interface and breaks > > >>>>>>> somewhere > > >>>>>>>> in producer. > > >>>>>>>> > > >>>>>>>> Cheers, > > >>>>>>>> Alieh > > >>>>>>>> > > >>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton < > > >>>> fearthecel...@gmail.com> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Justine, > > >>>>>>>>> > > >>>>>>>>> The method signatures for the interface are indeed open-ended, > > >> but > > >>>> the > > >>>>>>>> KIP > > >>>>>>>>> states that its uses will be limited. See the motivation > > >> section: > > >>>>>>>>> > > >>>>>>>>>> We believe that the user should be able to develop custom > > >>> exception > > >>>>>>>>> handlers for managing producer exceptions. On the other hand, > > >> this > > >>>>> will > > >>>>>>>> be > > >>>>>>>>> an expert-level API, and using that may result in strange > > >>> behaviour > > >>>> in > > >>>>>>>> the > > >>>>>>>>> system, making it hard to find the root cause. Therefore, the > > >>> custom > > >>>>>>>>> handler is currently limited to handling > RecordTooLargeException > > >>> and > > >>>>>>>>> UnknownTopicOrPartitionException. > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> > > >>>>>>>>> Chris > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan > > >>>>> <jols...@confluent.io.invalid > > >>>>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi Alieh, > > >>>>>>>>>> > > >>>>>>>>>> I was out for KSB and then was also sick. :( > > >>>>>>>>>> > > >>>>>>>>>> To your point 1) Chris, I don't think it is limited to two > > >>> specific > > >>>>>>>>>> scenarios, since the interface accepts a generic Exception e > > >> and > > >>>> can > > >>>>>>> be > > >>>>>>>>>> implemented to check if that e is an instanceof any exception. > > >> I > > >>>>>>> didn't > > >>>>>>>>> see > > >>>>>>>>>> anywhere that specific errors are enforced. I'm a bit > concerned > > >>>> about > > >>>>>>>>> this > > >>>>>>>>>> actually. I'm concerned about the opened-endedness and the > > >>> contract > > >>>>>>> we > > >>>>>>>>> have > > >>>>>>>>>> with transactions. We are allowing the client to make > decisions > > >>>> that > > >>>>>>>> are > > >>>>>>>>>> somewhat invisible to the server. As an aside, can we build in > > >>> log > > >>>>>>>>> messages > > >>>>>>>>>> when the handler decides to skip etc a message. I'm really > > >>>> concerned > > >>>>>>>>> about > > >>>>>>>>>> messages being silently dropped. > > >>>>>>>>>> > > >>>>>>>>>> I do think Chris's point 2) about retriable vs non retriable > > >>> errors > > >>>>>>> is > > >>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic or > > >>>> partition > > >>>>>>>>>> exception too early, as there are cases where it can be > > >>> transient. > > >>>>>>>>>> > > >>>>>>>>>> I'm still a little bit wary of allowing dropping records as > > >> part > > >>> of > > >>>>>>> EOS > > >>>>>>>>>> generally as in many cases, these errors signify an issue with > > >>> the > > >>>>>>>>> original > > >>>>>>>>>> data. I understand that streams and connect/mirror maker may > > >> have > > >>>>>>>> reasons > > >>>>>>>>>> they want to progress past these messages, but wondering if > > >> there > > >>>> is > > >>>>>>> a > > >>>>>>>>> way > > >>>>>>>>>> that can be done application-side. I'm willing to accept this > > >>> sort > > >>>> of > > >>>>>>>>>> proposal if we can make it clear that this sort of thing is > > >>>> happening > > >>>>>>>> and > > >>>>>>>>>> we limit the blast radius for what we can do. > > >>>>>>>>>> > > >>>>>>>>>> Justine > > >>>>>>>>>> > > >>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton > > >>>> <chr...@aiven.io.invalid > > >>>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>> > > >>>>>>>>>>> Sorry for the delay, I've been out sick. I still have some > > >>>> thoughts > > >>>>>>>>> that > > >>>>>>>>>>> I'd like to see addressed before voting. > > >>>>>>>>>>> > > >>>>>>>>>>> 1) If flexibility is the motivation for a pluggable > interface, > > >>> why > > >>>>>>>> are > > >>>>>>>>> we > > >>>>>>>>>>> only limiting the uses for this interface to two very > specific > > >>>>>>>>> scenarios? > > >>>>>>>>>>> Why not also allow, e.g., authorization errors to be handled > > >> as > > >>>>>>> well > > >>>>>>>>>>> (allowing users to drop records destined for some off-limits > > >>>>>>> topics, > > >>>>>>>> or > > >>>>>>>>>>> retry for a limited duration in case there's a delay in the > > >>>>>>>> propagation > > >>>>>>>>>> of > > >>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other > > >> errors > > >>>>>>> that > > >>>>>>>>>> could > > >>>>>>>>>>> be handled with this new API, both to avoid the follow-up > work > > >>> of > > >>>>>>>>> another > > >>>>>>>>>>> KIP to address them in the future, and to make sure that > we're > > >>> not > > >>>>>>>>>> painting > > >>>>>>>>>>> ourselves into a corner with the current API in a way that > > >> would > > >>>>>>> make > > >>>>>>>>>>> future modifications difficult. > > >>>>>>>>>>> > > >>>>>>>>>>> 2) Something feels a bit off with how retriable vs. > > >>> non-retriable > > >>>>>>>>> errors > > >>>>>>>>>>> are handled with the interface. Why not introduce two > separate > > >>>>>>>> methods > > >>>>>>>>> to > > >>>>>>>>>>> handle each case separately? That way there's no ambiguity or > > >>>>>>>> implicit > > >>>>>>>>>>> behavior when, e.g., attempting to retry on a > > >>>>>>>> RecordTooLargeException. > > >>>>>>>>>> This > > >>>>>>>>>>> could be something like `NonRetriableResponse > > >>>>>>> handle(ProducerRecord, > > >>>>>>>>>>> Exception)` and `RetriableResponse > > >>> handleRetriable(ProducerRecord, > > >>>>>>>>>>> Exception)`, though the exact names and shape can obviously > be > > >>>>>>> toyed > > >>>>>>>>>> with a > > >>>>>>>>>>> bit. > > >>>>>>>>>>> > > >>>>>>>>>>> 3) Although the flexibility of a pluggable interface may > > >> benefit > > >>>>>>> some > > >>>>>>>>>>> users' custom producer applications and Kafka Streams > > >>>> applications, > > >>>>>>>> it > > >>>>>>>>>>> comes at significant deployment cost for other low-/no-code > > >>>>>>>>> environments, > > >>>>>>>>>>> including but not limited to Kafka Connect and MirrorMaker 2. > > >>> Can > > >>>>>>> we > > >>>>>>>>> add > > >>>>>>>>>> a > > >>>>>>>>>>> default implementation of the exception handler that allows > > >> for > > >>>>>>> some > > >>>>>>>>>> simple > > >>>>>>>>>>> behavior to be tweaked via configuration property? Two things > > >>> that > > >>>>>>>>> would > > >>>>>>>>>> be > > >>>>>>>>>>> nice to have would be A) an upper bound on the retry time for > > >>>>>>>>>>> unknown-topic-partition exceptions and B) an option to drop > > >>>> records > > >>>>>>>>> that > > >>>>>>>>>>> are large enough to trigger a record-too-large exception. > > >>>>>>>>>>> > > >>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of the > > >>>> proposed > > >>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious, > > >>> especially > > >>>>>>>> when > > >>>>>>>>>>> trying to guess the behavior for retriable errors. > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> > > >>>>>>>>>>> Chris > > >>>>>>>>>>> > > >>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi > > >>>>>>>>>> <asae...@confluent.io.invalid > > >>>>>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi all, > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> A summary of the KIP and the discussions: > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> The KIP introduces a handler interface for Producer in order > > >> to > > >>>>>>>>> handle > > >>>>>>>>>>> two > > >>>>>>>>>>>> exceptions: RecordTooLargeException and > > >>>>>>>>>> UnknownTopicOrPartitionException. > > >>>>>>>>>>>> The handler handles the exceptions per-record. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> - Do we need this handler? [Motivation and Examples > > >> sections] > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the producer > > >>>>>>> collects > > >>>>>>>>>>> multiple > > >>>>>>>>>>>> records in batches. Then a RecordTooLargeException related > > >> to a > > >>>>>>>>> single > > >>>>>>>>>>>> record leads to failing the entire batch. A custom exception > > >>>>>>>> handler > > >>>>>>>>> in > > >>>>>>>>>>>> this case may decide on dropping the record and continuing > > >> the > > >>>>>>>>>>> processing. > > >>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a > > >> record > > >>>>>>>> that > > >>>>>>>>> is > > >>>>>>>>>>> too > > >>>>>>>>>>>> large is a poison pill record, and there is no way to skip > > >> over > > >>>>>>>> it. A > > >>>>>>>>>>>> handler would allow us to react to this error inside the > > >>>>>>> producer, > > >>>>>>>>>> i.e., > > >>>>>>>>>>>> local to where the error happens, and thus simplify the > > >> overall > > >>>>>>>> code > > >>>>>>>>>>>> significantly. Please read the Motivation section for more > > >>>>>>>>> explanation. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the > producer > > >>>>>>>> handles > > >>>>>>>>>>> this > > >>>>>>>>>>>> exception internally and only issues a WARN log about > missing > > >>>>>>>>> metadata > > >>>>>>>>>>> and > > >>>>>>>>>>>> retries internally. Later, when the producer hits " > > >>>>>>>>> deliver.timeout.ms" > > >>>>>>>>>>> it > > >>>>>>>>>>>> throws a TimeoutException, and the user can only blindly > > >> retry, > > >>>>>>>> which > > >>>>>>>>>> may > > >>>>>>>>>>>> result in an infinite retry loop. The thrown > TimeoutException > > >>>>>>>> "cuts" > > >>>>>>>>>> the > > >>>>>>>>>>>> connection to the underlying root cause of missing metadata > > >>>>>>> (which > > >>>>>>>>>> could > > >>>>>>>>>>>> indeed be a transient error but is persistent for a > > >>> non-existing > > >>>>>>>>>> topic). > > >>>>>>>>>>>> Thus, there is no programmatic way to break the infinite > > >> retry > > >>>>>>>> loop. > > >>>>>>>>>>> Kafka > > >>>>>>>>>>>> Streams also blindly retries for this case, and the > > >> application > > >>>>>>>> gets > > >>>>>>>>>>> stuck. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> - Having interface vs configuration option: [Motivation, > > >>>>>>> Examples, > > >>>>>>>>> and > > >>>>>>>>>>>> Rejected Alternatives sections] > > >>>>>>>>>>>> > > >>>>>>>>>>>> Our solution is introducing an interface due to the full > > >>>>>>>> flexibility > > >>>>>>>>>> that > > >>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams ones, > > >>>>>>>> determine > > >>>>>>>>>> the > > >>>>>>>>>>>> handler's behaviour based on the situation. For example, f > > >>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may want > > >> to > > >>>>>>>> raise > > >>>>>>>>> an > > >>>>>>>>>>>> error for some topics but retry it for other topics. Having > a > > >>>>>>>>>>> configuration > > >>>>>>>>>>>> option with a fixed set of possibilities does not serve the > > >>>>>>> user's > > >>>>>>>>>>>> needs. See Example 2, please. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces > > >> section] > > >>>>>>>>>>>> > > >>>>>>>>>>>> If the custom handler decides on SWALLOW for > > >>>>>>>> RecordTooLargeException, > > >>>>>>>>>>> then > > >>>>>>>>>>>> this record will not be a part of the batch of transactions > > >> and > > >>>>>>>> will > > >>>>>>>>>> also > > >>>>>>>>>>>> not be sent to the broker in non-transactional mode. So no > > >>>>>>> worries > > >>>>>>>>>> about > > >>>>>>>>>>>> getting a RecordTooLargeException from the broker in this > > >> case, > > >>>>>>> as > > >>>>>>>>> the > > >>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW means > > >>> drop > > >>>>>>>> the > > >>>>>>>>>>> record > > >>>>>>>>>>>> and continue/swallow the error. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> - What if the handle() method implements RETRY for > > >>>>>>>>>>> RecordTooLargeException? > > >>>>>>>>>>>> [Proposed Changes section] > > >>>>>>>>>>>> > > >>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW for > > >>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal to > > >> FAIL. > > >>>>>>>> This > > >>>>>>>>> is > > >>>>>>>>>>>> well documented/informed in javadoc. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> - What if the handle() method of the handler throws an > > >>> exception? > > >>>>>>>>>>>> > > >>>>>>>>>>>> The handler is expected to have correct code. If it throws > an > > >>>>>>>>>> exception, > > >>>>>>>>>>>> everything fails. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> This is a PoC PR < > https://github.com/apache/kafka/pull/15846 > > >>> > > >>>>>>> ONLY > > >>>>>>>>> for > > >>>>>>>>>>>> RecordTooLargeException. The code changes related to > > >>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this PR > > >>> LATER. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> Looking forward to your feedback again. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> Cheers, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Alieh > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True < > > >> k...@kirktrue.pro> > > >>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks for the updates! > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Comments inline... > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi > > >>>>>>>>>>> <asae...@confluent.io.INVALID > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Addressing some of the main concerns: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by broker, > > >>>>>>>> producer > > >>>>>>>>>> and > > >>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler` > > >> interface > > >>>>>>>> is > > >>>>>>>>>>>>> introduced > > >>>>>>>>>>>>>> to affect only the exceptions thrown from the producer. > > >> This > > >>>>>>>> KIP > > >>>>>>>>>> very > > >>>>>>>>>>>>>> specifically means to provide a possibility to manage the > > >>>>>>>>>>>>>> `RecordTooLargeException` thrown from the Producer.send() > > >>>>>>>> method. > > >>>>>>>>>>>> Please > > >>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I > > >>>>>>> investigated > > >>>>>>>>> the > > >>>>>>>>>>>> issue > > >>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern about > > >> how > > >>>>>>>> we > > >>>>>>>>>>> handle > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>> errors as well. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are > called > > >>>>>>>> when > > >>>>>>>>>> the > > >>>>>>>>>>>>> record > > >>>>>>>>>>>>>> sent to the server is acknowledged, while this is not the > > >>>>>>>> desired > > >>>>>>>>>>> time > > >>>>>>>>>>>>> for > > >>>>>>>>>>>>>> all exceptions. We intend to handle exceptions beforehand. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I guess it makes sense to keep the expectation for when > > >>>>>>> Callback > > >>>>>>>> is > > >>>>>>>>>>>>> invoked as-is vs. shoehorning more into it. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> - What if the custom handler returns RETRY for > > >>>>>>>>>>>>> `RecordTooLargeException`? I > > >>>>>>>>>>>>>> assume changing the producer configuration at runtime is > > >>>>>>>>> possible. > > >>>>>>>>>> If > > >>>>>>>>>>>> so, > > >>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in the > > >>>>>>> next > > >>>>>>>>>> try, > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>> too large record is not poisoning any more. I am not 100% > > >>>>>>> sure > > >>>>>>>>>> about > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>> technical details, though. Otherwise, we can consider the > > >>>>>>> RETRY > > >>>>>>>>> as > > >>>>>>>>>>> FAIL > > >>>>>>>>>>>>> for > > >>>>>>>>>>>>>> this exception. Another solution would be to consider a > > >>>>>>>> constant > > >>>>>>>>>>> number > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>> times for RETRY which can be useful for other exceptions > as > > >>>>>>>> well. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> It’s not presently possible to change the configuration of > > >> an > > >>>>>>>>>> existing > > >>>>>>>>>>>>> Producer at runtime. So if a record hits a > > >>>>>>>> RecordTooLargeException > > >>>>>>>>>>> once, > > >>>>>>>>>>>> no > > >>>>>>>>>>>>> amount of retrying (with the current Producer) will change > > >>> that > > >>>>>>>>> fact. > > >>>>>>>>>>> So > > >>>>>>>>>>>>> I’m still a little stuck on how to handle a response of > > >> RETRY > > >>>>>>> for > > >>>>>>>>> an > > >>>>>>>>>>>>> “oversized” record. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> - What if the handle() method itself throws an exception? > I > > >>>>>>>> think > > >>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be > exactly > > >>>>>>>> like > > >>>>>>>>>> when > > >>>>>>>>>>>> no > > >>>>>>>>>>>>>> custom handler is defined since the user actually did not > > >>>>>>> have > > >>>>>>>> a > > >>>>>>>>>>>> working > > >>>>>>>>>>>>>> handler. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is the > > >> right > > >>>>>>>>>> choice. > > >>>>>>>>>>> It > > >>>>>>>>>>>>> then becomes a silent failure that might have > repercussions, > > >>>>>>>>>> depending > > >>>>>>>>>>> on > > >>>>>>>>>>>>> the business logic. A user would have to proactively trawls > > >>>>>>>> through > > >>>>>>>>>> the > > >>>>>>>>>>>>> logs for WARN/ERROR messages to catch it. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Throwing a hard error is pretty draconian, though… > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> - Why not use config parameters instead of an interface? > As > > >>>>>>>>>> explained > > >>>>>>>>>>>> in > > >>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that the > > >>>>>>> handler > > >>>>>>>>>> will > > >>>>>>>>>>> be > > >>>>>>>>>>>>>> used for a greater number of exceptions in the future. > > >>>>>>>> Defining a > > >>>>>>>>>>>>>> configuration parameter for each exception may make the > > >>>>>>>>>>> configuration a > > >>>>>>>>>>>>> bit > > >>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Agreed that the logic-via-configuration approach is weird > > >> and > > >>>>>>>>>> limiting. > > >>>>>>>>>>>>> Forget I ever suggested it ;) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I’d think additional background in the Motivation section > > >>> would > > >>>>>>>>> help > > >>>>>>>>>> me > > >>>>>>>>>>>>> understand how users might use this feature beyond a) > > >> skipping > > >>>>>>>>>>>> “oversized” > > >>>>>>>>>>>>> records, and b) not retrying missing topics. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Small change: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for > brevity > > >>>>>>> and > > >>>>>>>>>>>>> simplicity. > > >>>>>>>>>>>>>> Could’ve been HandlerResponse too I think! > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The name change sounds good to me. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks Alieh! > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I thank you all again for your useful > > >> questions/suggestions. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I would be happy to hear more of your concerns, as stated > > >> in > > >>>>>>>> some > > >>>>>>>>>>>>> feedback. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>> Alieh > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan > > >>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks Alieh for the updates. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I'm a little concerned about the design pattern here. It > > >>>>>>> seems > > >>>>>>>>>> like > > >>>>>>>>>>> we > > >>>>>>>>>>>>> want > > >>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic > > >>>>>>> handler. > > >>>>>>>>>>>>>>> I think we tried to narrow down on the specific errors we > > >>>>>>> want > > >>>>>>>>> to > > >>>>>>>>>>>>> handle, > > >>>>>>>>>>>>>>> but it feels a little clunky as we have a generic thing > > >> for > > >>>>>>>> two > > >>>>>>>>>>>> specific > > >>>>>>>>>>>>>>> errors. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to solve > > >>>>>>>> these > > >>>>>>>>>>>>> problems. I > > >>>>>>>>>>>>>>> agree though that we will need something more than the > > >> error > > >>>>>>>>>> classes > > >>>>>>>>>>>> I'm > > >>>>>>>>>>>>>>> proposing if we want to have different handling be > > >>>>>>>> configurable. > > >>>>>>>>>>>>>>> My concern is that the open-endedness of a handler means > > >>>>>>> that > > >>>>>>>> we > > >>>>>>>>>> are > > >>>>>>>>>>>>>>> creating more problems than we are solving. It is still > > >>>>>>>> unclear > > >>>>>>>>> to > > >>>>>>>>>>> me > > >>>>>>>>>>>>> how > > >>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could include > > >> an > > >>>>>>>>>> example? > > >>>>>>>>>>>> It > > >>>>>>>>>>>>>>> seems like there is a specific use case in mind and maybe > > >> we > > >>>>>>>> can > > >>>>>>>>>>> make > > >>>>>>>>>>>> a > > >>>>>>>>>>>>>>> design that is tighter and supports that case. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Justine > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True < > > >>>>>>> k...@kirktrue.pro> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks for the KIP! > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> A few questions: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> K1. What is the expected behavior for the producer if it > > >>>>>>>>>> generates > > >>>>>>>>>>> a > > >>>>>>>>>>>>>>>> RecordTooLargeException, but the handler returns RETRY? > > >>>>>>>>>>>>>>>> K2. How do we determine which Record was responsible for > > >>>>>>> the > > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException since we get that > > >> response > > >>>>>>>>> when > > >>>>>>>>>>>>>>> sending a > > >>>>>>>>>>>>>>>> batch of records? > > >>>>>>>>>>>>>>>> K3. What is the expected behavior if the handle() method > > >>>>>>>> itself > > >>>>>>>>>>>> throws > > >>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>> error? > > >>>>>>>>>>>>>>>> K4. What is the downside of adding an onError() method > to > > >>>>>>> the > > >>>>>>>>>>>>> Producer’s > > >>>>>>>>>>>>>>>> Callback interface vs. a new mechanism? > > >>>>>>>>>>>>>>>> K5. Can we change “ProducerExceptionHandlerResponse" to > > >>>>>>> just > > >>>>>>>>>>>> “Response” > > >>>>>>>>>>>>>>>> given that it’s an inner enum? > > >>>>>>>>>>>>>>>> K6. Any recommendation for callback authors to handle > > >>>>>>>> different > > >>>>>>>>>>>>> behavior > > >>>>>>>>>>>>>>>> for different topics? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I’ll echo what others have said, it would help me > > >>>>>>> understand > > >>>>>>>>> why > > >>>>>>>>>> we > > >>>>>>>>>>>>> want > > >>>>>>>>>>>>>>>> another handler class if there were more examples in the > > >>>>>>>>>> Motivation > > >>>>>>>>>>>>>>>> section. As it stands now, I agree with Chris that the > > >>>>>>> stated > > >>>>>>>>>>> issues > > >>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>> be solved by adding two new configuration options: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> oversized.record.behavior=fail > > >>>>>>>>>>>>>>>> retry.on.unknown.topic.or.partition=true > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> What I’m not yet able to wrap my head around is: what > > >>>>>>> exactly > > >>>>>>>>>> would > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> logic in the handler be? I’m not very imaginative, so > I’m > > >>>>>>>>>> assuming > > >>>>>>>>>>>>> they’d > > >>>>>>>>>>>>>>>> mostly be if-this-then-that. However, if they’re more > > >>>>>>>>>> complicated, > > >>>>>>>>>>>> I’d > > >>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>> other concerns. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>> Kirk > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi > > >>>>>>>>>>>>> <asae...@confluent.io.INVALID > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thank you all for the feedback! > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Addressing the main concern: The KIP is about giving > the > > >>>>>>>> user > > >>>>>>>>>> the > > >>>>>>>>>>>>>>> ability > > >>>>>>>>>>>>>>>>> to handle producer exceptions, but to be more > > >> conservative > > >>>>>>>> and > > >>>>>>>>>>> avoid > > >>>>>>>>>>>>>>>> future > > >>>>>>>>>>>>>>>>> issues, we decided to be limited to a short list of > > >>>>>>>>> exceptions. > > >>>>>>>>>> I > > >>>>>>>>>>>>>>>> included > > >>>>>>>>>>>>>>>>> *RecordTooLargeExceptin* and > > >>>>>>>>> *UnknownTopicOrPartitionException. > > >>>>>>>>>>>> *Open > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> suggestion for adding some more ;-) > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> KIP Updates: > > >>>>>>>>>>>>>>>>> - clarified the way that the user should configure the > > >>>>>>>>> Producer > > >>>>>>>>>> to > > >>>>>>>>>>>> use > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> custom handler. I think adding a producer config > > >> property > > >>>>>>> is > > >>>>>>>>> the > > >>>>>>>>>>>>>>> cleanest > > >>>>>>>>>>>>>>>>> one. > > >>>>>>>>>>>>>>>>> - changed the *ClientExceptionHandler* to > > >>>>>>>>>>> *ProducerExceptionHandler* > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> closer to what we are changing. > > >>>>>>>>>>>>>>>>> - added the ProducerRecord as the input parameter of > the > > >>>>>>>>>> handle() > > >>>>>>>>>>>>>>> method > > >>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>> well. > > >>>>>>>>>>>>>>>>> - increased the response types to 3 to have fail and > two > > >>>>>>>> types > > >>>>>>>>>> of > > >>>>>>>>>>>>>>>> continue. > > >>>>>>>>>>>>>>>>> - The default behaviour is having no custom handler, > > >>>>>>> having > > >>>>>>>>> the > > >>>>>>>>>>>>>>>>> corresponding config parameter set to null. Therefore, > > >> the > > >>>>>>>> KIP > > >>>>>>>>>>>>> provides > > >>>>>>>>>>>>>>>> no > > >>>>>>>>>>>>>>>>> default implementation of the interface. > > >>>>>>>>>>>>>>>>> - We follow the interface solution as described in the > > >>>>>>>>>>>>>>>>> Rejected Alternetives section. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>> Alieh > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax < > > >>>>>>>>>> mj...@apache.org > > >>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh! It addresses an important > > >> case > > >>>>>>>> for > > >>>>>>>>>>> error > > >>>>>>>>>>>>>>>>>> handling. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I agree that using this handler would be an expert > API, > > >>>>>>> as > > >>>>>>>>>>>> mentioned > > >>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>> a few people. But I don't think it would be a reason > to > > >>>>>>> not > > >>>>>>>>> add > > >>>>>>>>>>> it. > > >>>>>>>>>>>>>>> It's > > >>>>>>>>>>>>>>>>>> always a tricky tradeoff what to expose to users and > to > > >>>>>>>> avoid > > >>>>>>>>>>> foot > > >>>>>>>>>>>>>>> guns, > > >>>>>>>>>>>>>>>>>> but we added similar handlers to Kafka Streams, and > > >> have > > >>>>>>>> good > > >>>>>>>>>>>>>>> experience > > >>>>>>>>>>>>>>>>>> with it. Hence, I understand, but don't share the > > >> concern > > >>>>>>>>>> raised. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I also agree that there is some responsibility by the > > >>>>>>> user > > >>>>>>>> to > > >>>>>>>>>>>>>>> understand > > >>>>>>>>>>>>>>>>>> how such a handler should be implemented to not drop > > >> data > > >>>>>>>> by > > >>>>>>>>>>>>> accident. > > >>>>>>>>>>>>>>>>>> But it seem unavoidable and acceptable. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> While I understand that a "simpler / reduced" API (eg > > >> via > > >>>>>>>>>>> configs) > > >>>>>>>>>>>>>>> might > > >>>>>>>>>>>>>>>>>> also work, I personally prefer a full handler. Configs > > >>>>>>> have > > >>>>>>>>> the > > >>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>> issue that they could be miss-used potentially leading > > >> to > > >>>>>>>>>>>> incorrectly > > >>>>>>>>>>>>>>>>>> dropped data, but at the same time are less flexible > > >> (and > > >>>>>>>>> thus > > >>>>>>>>>>>> maybe > > >>>>>>>>>>>>>>>>>> ever harder to use correctly...?). Base on my > > >> experience, > > >>>>>>>>> there > > >>>>>>>>>>> is > > >>>>>>>>>>>>>>> also > > >>>>>>>>>>>>>>>>>> often weird corner case for which it make sense to > also > > >>>>>>>> drop > > >>>>>>>>>>>> records > > >>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>> other exceptions, and a full handler has the advantage > > >> of > > >>>>>>>>> full > > >>>>>>>>>>>>>>>>>> flexibility and "absolute power!". > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> To be fair: I don't know the exact code paths of the > > >>>>>>>> producer > > >>>>>>>>>> in > > >>>>>>>>>>>>>>>>>> details, so please keep me honest. But my > understanding > > >>>>>>> is, > > >>>>>>>>>> that > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> KIP > > >>>>>>>>>>>>>>>>>> aims to allow users to react to internal exception, > and > > >>>>>>>>> decide > > >>>>>>>>>> to > > >>>>>>>>>>>>> keep > > >>>>>>>>>>>>>>>>>> retrying internally, swallow the error and drop the > > >>>>>>> record, > > >>>>>>>>> or > > >>>>>>>>>>>> raise > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> error? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Maybe the KIP would need to be a little bit more > > >> precises > > >>>>>>>>> what > > >>>>>>>>>>>> error > > >>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>> want to cover -- I don't think this list must be > > >>>>>>>> exhaustive, > > >>>>>>>>> as > > >>>>>>>>>>> we > > >>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>> always do follow up KIP to also apply the handler to > > >>>>>>> other > > >>>>>>>>>> errors > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>> expand the scope of the handler. The KIP does mention > > >>>>>>>>> examples, > > >>>>>>>>>>> but > > >>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>> might be good to explicitly state for what cases the > > >>>>>>>> handler > > >>>>>>>>>> gets > > >>>>>>>>>>>>>>>> applied? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I am also not sure if CONTINUE and FAIL are enough > > >>>>>>> options? > > >>>>>>>>>> Don't > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>> need three options? Or would `CONTINUE` have different > > >>>>>>>>> meaning > > >>>>>>>>>>>>>>> depending > > >>>>>>>>>>>>>>>>>> on the type of error? Ie, for a retryable error > > >>>>>>> `CONTINUE` > > >>>>>>>>>> would > > >>>>>>>>>>>> mean > > >>>>>>>>>>>>>>>>>> keep retrying internally, but for a non-retryable > error > > >>>>>>>>>>> `CONTINUE` > > >>>>>>>>>>>>>>> means > > >>>>>>>>>>>>>>>>>> swallow the error and drop the record? This semantic > > >>>>>>>> overload > > >>>>>>>>>>> seems > > >>>>>>>>>>>>>>>>>> tricky to reason about by users, so it might better to > > >>>>>>>> split > > >>>>>>>>>>>>>>> `CONTINUE` > > >>>>>>>>>>>>>>>>>> into two cases -> `RETRY` and `SWALLOW` (or some > better > > >>>>>>>>> names). > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Additionally, should we just ship a > > >>>>>>>>>>> `DefaultClientExceptionHandler` > > >>>>>>>>>>>>>>>>>> which would return `FAIL`, for backward compatibility. > > >> Or > > >>>>>>>>> don't > > >>>>>>>>>>>> have > > >>>>>>>>>>>>>>> any > > >>>>>>>>>>>>>>>>>> default handler to begin with and allow it to be > > >> `null`? > > >>>>>>> I > > >>>>>>>>>> don't > > >>>>>>>>>>>> see > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> need for a specific `TransactionExceptionHandler`. To > > >> me, > > >>>>>>>> the > > >>>>>>>>>>> goal > > >>>>>>>>>>>>>>>>>> should be to not modify the default behavior at all, > > >> but > > >>>>>>> to > > >>>>>>>>>> just > > >>>>>>>>>>>>> allow > > >>>>>>>>>>>>>>>>>> users to change the default behavior if there is a > > >> need. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> What is missing on the KIP though it, how the handler > > >> is > > >>>>>>>>> passed > > >>>>>>>>>>>> into > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> producer thought? Would we need a new config which > > >> allows > > >>>>>>>> to > > >>>>>>>>>> set > > >>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>> custom handler? And/or would we allow to pass in an > > >>>>>>>> instance > > >>>>>>>>>> via > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> constructor or add a new method to set a handler? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote: > > >>>>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Exception handling in the Kafka producer and consumer > > >> is > > >>>>>>>>>> really > > >>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>> ideal. > > >>>>>>>>>>>>>>>>>>> It’s even harder working out what’s going on with the > > >>>>>>>>>> consumer. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I’m a bit nervous about this KIP and I agree with > > >> Chris > > >>>>>>>> that > > >>>>>>>>>> it > > >>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>> do > > >>>>>>>>>>>>>>>>>> with additional > > >>>>>>>>>>>>>>>>>>> motivation. This would be an expert-level interface > > >>>>>>> given > > >>>>>>>>> how > > >>>>>>>>>>>>>>>> complicated > > >>>>>>>>>>>>>>>>>>> the exception handling for Kafka has become. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 7. The application is not really aware of the > batching > > >>>>>>>> being > > >>>>>>>>>>> done > > >>>>>>>>>>>> on > > >>>>>>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>>> behalf. > > >>>>>>>>>>>>>>>>>>> The ProduceResponse can actually return an array of > > >>>>>>>> records > > >>>>>>>>>>> which > > >>>>>>>>>>>>>>>> failed > > >>>>>>>>>>>>>>>>>>> per batch. If you get RecordTooLargeException, and > > >> want > > >>>>>>> to > > >>>>>>>>>>> retry, > > >>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>> probably > > >>>>>>>>>>>>>>>>>>> need to remove the offending records from the batch > > >> and > > >>>>>>>>> retry > > >>>>>>>>>>> it. > > >>>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>>> is getting fiddly. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 8. There is already o.a.k.clients.producer.Callback. > I > > >>>>>>>>> wonder > > >>>>>>>>>>>>> whether > > >>>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>>> alternative might be to add a method to the existing > > >>>>>>>>> Callback > > >>>>>>>>>>>>>>>> interface, > > >>>>>>>>>>>>>>>>>> such as: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> ClientExceptionResponse onException(Exception > > >>>>>>> exception) > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> It would be called when a ProduceResponse contains an > > >>>>>>>> error, > > >>>>>>>>>> but > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> producer is going to retry. It tells the producer > > >>>>>>> whether > > >>>>>>>> to > > >>>>>>>>>> go > > >>>>>>>>>>>>> ahead > > >>>>>>>>>>>>>>>>>> with the retry > > >>>>>>>>>>>>>>>>>>> or not. The default implementation would be to > > >> CONTINUE, > > >>>>>>>>>> because > > >>>>>>>>>>>>>>> that’s > > >>>>>>>>>>>>>>>>>>> just continuing to retry as planned. Note that this > > >> is a > > >>>>>>>>>>>> per-record > > >>>>>>>>>>>>>>>>>> callback, so > > >>>>>>>>>>>>>>>>>>> the application would be able to understand which > > >>>>>>> records > > >>>>>>>>>>> failed. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> By using an existing interface, we already know how > to > > >>>>>>>>>> configure > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> we know > > >>>>>>>>>>>>>>>>>>> about the threading model for calling it. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>> Andrew > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton > > >>>>>>>>>>> <chr...@aiven.io.INVALID > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for the KIP! The issue with writing to > > >>>>>>>> non-existent > > >>>>>>>>>>> topics > > >>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>> particularly frustrating for users of Kafka Connect > > >> and > > >>>>>>>> has > > >>>>>>>>>>> been > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> source > > >>>>>>>>>>>>>>>>>>>> of a handful of Jira tickets over the years. My > > >>>>>>> thoughts: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 1. An additional detail we can add to the motivation > > >>>>>>> (or > > >>>>>>>>>>> possibly > > >>>>>>>>>>>>>>>>>> rejected > > >>>>>>>>>>>>>>>>>>>> alternatives) section is that this kind of custom > > >> retry > > >>>>>>>>> logic > > >>>>>>>>>>>> can't > > >>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> implemented by hand by, e.g., setting retries to 0 > in > > >>>>>>> the > > >>>>>>>>>>>> producer > > >>>>>>>>>>>>>>>>>> config > > >>>>>>>>>>>>>>>>>>>> and handling exceptions at the application level. Or > > >>>>>>>>> rather, > > >>>>>>>>>> it > > >>>>>>>>>>>>> can, > > >>>>>>>>>>>>>>>>>> but 1) > > >>>>>>>>>>>>>>>>>>>> it's a bit painful to have to reimplement at every > > >>>>>>>>> call-site > > >>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> Producer::send (and any code that awaits the > returned > > >>>>>>>>> Future) > > >>>>>>>>>>> and > > >>>>>>>>>>>>> 2) > > >>>>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>> impossible to do this without losing idempotency on > > >>>>>>>>> retries. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 2. That said, I wonder if a pluggable interface is > > >>>>>>> really > > >>>>>>>>> the > > >>>>>>>>>>>> right > > >>>>>>>>>>>>>>>> call > > >>>>>>>>>>>>>>>>>>>> here. Documenting the interactions of a producer > with > > >>>>>>>>>>>>>>>>>>>> a ClientExceptionHandler instance will be tricky, > and > > >>>>>>>>>>>> implementing > > >>>>>>>>>>>>>>>> them > > >>>>>>>>>>>>>>>>>>>> will also be a fair amount of work. I believe that > > >>>>>>> there > > >>>>>>>>>> needs > > >>>>>>>>>>> to > > >>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>>>>> more granularity for how writes to non-existent > > >> topics > > >>>>>>>> (or > > >>>>>>>>>>>> really, > > >>>>>>>>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from > > >> the > > >>>>>>>>>> broker) > > >>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>> handled, > > >>>>>>>>>>>>>>>>>>>> but I'm torn between keeping it simple with maybe > one > > >>>>>>> or > > >>>>>>>>> two > > >>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>> producer > > >>>>>>>>>>>>>>>>>>>> config properties, or a full-blown pluggable > > >> interface. > > >>>>>>>> If > > >>>>>>>>>>> there > > >>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>> cases that would benefit from a pluggable interface, > > >> it > > >>>>>>>>> would > > >>>>>>>>>>> be > > >>>>>>>>>>>>>>> nice > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> identify these and add them to the KIP to strengthen > > >>>>>>> the > > >>>>>>>>>>>>> motivation. > > >>>>>>>>>>>>>>>>>> Right > > >>>>>>>>>>>>>>>>>>>> now, I'm not sure the two that are mentioned in the > > >>>>>>>>>> motivation > > >>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>> sufficient. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 3. Alternatively, a possible compromise is for this > > >> KIP > > >>>>>>>> to > > >>>>>>>>>>>>> introduce > > >>>>>>>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>>>> properties that dictate how to handle > > >>>>>>>>> unknown-topic-partition > > >>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>> record-too-large errors, with the thinking that if > we > > >>>>>>>>>>> introduce a > > >>>>>>>>>>>>>>>>>> pluggable > > >>>>>>>>>>>>>>>>>>>> interface later on, these properties will be > > >> recognized > > >>>>>>>> by > > >>>>>>>>>> the > > >>>>>>>>>>>>>>> default > > >>>>>>>>>>>>>>>>>>>> implementation of that interface but could be > > >>>>>>> completely > > >>>>>>>>>>> ignored > > >>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>> replaced by alternative implementations. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 4. (Nit) You can remove the "This page is meant as a > > >>>>>>>>> template > > >>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>> writing a > > >>>>>>>>>>>>>>>>>>>> KIP..." part from the KIP. It's not a template > > >> anymore > > >>>>>>> :) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 5. If we do go the pluggable interface route, > > >> wouldn't > > >>>>>>> we > > >>>>>>>>>> want > > >>>>>>>>>>> to > > >>>>>>>>>>>>>>> add > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> possibility for retry logic? The simplest version of > > >>>>>>> this > > >>>>>>>>>> could > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>> add a > > >>>>>>>>>>>>>>>>>>>> RETRY value to the ClientExceptionHandlerResponse > > >> enum. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead > > >> of > > >>>>>>>>>>>> "CONTINUE" > > >>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> the ClientExceptionHandlerResponse enum, since they > > >>>>>>> cause > > >>>>>>>>>>> records > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> dropped. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Chris > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan > > >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Hey Alieh, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I echo what Omnia says, I'm not sure I understand > > >> the > > >>>>>>>>>>>> implications > > >>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> change and I think more detail is needed. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> This comment also confused me a bit: > > >>>>>>>>>>>>>>>>>>>>> * {@code ClientExceptionHandler} that continues the > > >>>>>>>>>>> transaction > > >>>>>>>>>>>>>>> even > > >>>>>>>>>>>>>>>>>> if a > > >>>>>>>>>>>>>>>>>>>>> record is too large. > > >>>>>>>>>>>>>>>>>>>>> * Otherwise, it makes the transaction to fail. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Relatedly, I've been working with some folks on a > > >> KIP > > >>>>>>>> for > > >>>>>>>>>>>>>>>> transactions > > >>>>>>>>>>>>>>>>>>>>> errors and how they are handled. Specifically for > > >> the > > >>>>>>>>>>>>>>>>>>>>> RecordTooLargeException (and a few other errors), > we > > >>>>>>>> want > > >>>>>>>>> to > > >>>>>>>>>>>> give > > >>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>>>>> error category for this error that allows the > > >>>>>>>> application > > >>>>>>>>> to > > >>>>>>>>>>>>> choose > > >>>>>>>>>>>>>>>>>> how it > > >>>>>>>>>>>>>>>>>>>>> is handled. Maybe this KIP is something that you > are > > >>>>>>>>> looking > > >>>>>>>>>>>> for? > > >>>>>>>>>>>>>>>> Stay > > >>>>>>>>>>>>>>>>>>>>> tuned :) > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Justine > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim < > > >>>>>>>>>>>>>>>> o.g.h.ibra...@gmail.com > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I have couple of comments > > >>>>>>>>>>>>>>>>>>>>>> - You mentioned in the KIP motivation, > > >>>>>>>>>>>>>>>>>>>>>>> Another example for which a production exception > > >>>>>>>> handler > > >>>>>>>>>>> could > > >>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>> useful > > >>>>>>>>>>>>>>>>>>>>>> is if a user tries to write into a non-existing > > >>>>>>> topic, > > >>>>>>>>>> which > > >>>>>>>>>>>>>>>> returns a > > >>>>>>>>>>>>>>>>>>>>>> retryable error code; with infinite retries, the > > >>>>>>>> producer > > >>>>>>>>>>> would > > >>>>>>>>>>>>>>> hang > > >>>>>>>>>>>>>>>>>>>>>> retrying forever. A handler could help to break > the > > >>>>>>>>>> infinite > > >>>>>>>>>>>>> retry > > >>>>>>>>>>>>>>>>>> loop. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> How the handler can differentiate between > something > > >>>>>>>> that > > >>>>>>>>> is > > >>>>>>>>>>>>>>>> temporary > > >>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> it should keep retrying and something permanent > > >> like > > >>>>>>>>> forgot > > >>>>>>>>>>> to > > >>>>>>>>>>>>>>>> create > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> topic? temporary here could be > > >>>>>>>>>>>>>>>>>>>>>> the producer get deployed before the topic > creation > > >>>>>>>>> finish > > >>>>>>>>>>>>>>>> (specially > > >>>>>>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>>>>>>> the topic creation is handled via IaC) > > >>>>>>>>>>>>>>>>>>>>>> temporary offline partitions > > >>>>>>>>>>>>>>>>>>>>>> leadership changing > > >>>>>>>>>>>>>>>>>>>>>> Isn’t this putting the producer at risk of > > >>>>>>>> dropping > > >>>>>>>>>>>> records > > >>>>>>>>>>>>>>>>>>>>>> unintentionally? > > >>>>>>>>>>>>>>>>>>>>>> - Can you elaborate more on what is written in the > > >>>>>>>>>>>> compatibility > > >>>>>>>>>>>>> / > > >>>>>>>>>>>>>>>>>>>>>> migration plan section please by explaining in bit > > >>>>>>> more > > >>>>>>>>>>> details > > >>>>>>>>>>>>>>> what > > >>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> changing behaviour and how this will impact client > > >>>>>>> who > > >>>>>>>>> are > > >>>>>>>>>>>>>>>> upgrading? > > >>>>>>>>>>>>>>>>>>>>>> - In the proposal changes can you elaborate in the > > >>>>>>> KIP > > >>>>>>>>>> where > > >>>>>>>>>>> in > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> producer lifecycle will ClientExceptionHandler and > > >>>>>>>>>>>>>>>>>>>>>> TransactionExceptionHandler get triggered, and how > > >>>>>>> will > > >>>>>>>>> the > > >>>>>>>>>>>>>>> producer > > >>>>>>>>>>>>>>>>>>>>>> configure them to point to costumed > implementation. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Thanks > > >>>>>>>>>>>>>>>>>>>>>> Omnia > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi > > >>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to > > >>>>>>>>>> Producer. > > >>>>>>>>>>>>>>>>>>>>>>> < > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> I look forward to your feedback! > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>>>>>> Alieh > > > > > > >