Oh I see. The type isn't the error type but a newly defined type for the response. Makes sense and works for me.
Justine On Mon, May 13, 2024 at 9:13 AM Chris Egerton <fearthecel...@gmail.com> wrote: > If we have dedicated methods for each kind of exception > (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't that > provide sufficient constraint? I'm not suggesting we eliminate these > methods, just that we change their return types to something more flexible. > > On Mon, May 13, 2024, 12:07 Justine Olshan <jols...@confluent.io.invalid> > wrote: > > > I'm not sure I agree with the Retriable and NonRetriableResponse comment. > > This doesn't limit the blast radius or enforce certain errors are used. > > I think we might disagree on how controlled these interfaces can be... > > > > Justine > > > > On Mon, May 13, 2024 at 8:40 AM Chris Egerton <chr...@aiven.io.invalid> > > wrote: > > > > > Hi Alieh, > > > > > > Thanks for the updates! I just have a few more thoughts: > > > > > > - I don't think a boolean property is sufficient to dictate retries for > > > unknown topic partitions, though. These errors can occur if a topic has > > > just been created, which can occur if, for example, automatic topic > > > creation is enabled for a multi-task connector. This is why I proposed > a > > > timeout instead of a boolean (and see my previous email for why > reducing > > > max.block.ms for a producer is not a viable alternative). If it helps, > > one > > > way to reproduce this yourself is to add the line > > > `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here: > > > > > > > > > https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134 > > > and then check the logs afterward for messages like "Error while > fetching > > > metadata with correlation id <n> : > > {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}". > > > > > > - I also don't think we need custom XxxResponse enums for every > possible > > > method; it seems like this will lead to a lot of duplication and > > cognitive > > > overhead if we want to expand the error handler in the future. > Something > > > more flexible like RetriableResponse and NonRetriableResponse could > > > suffice. > > > > > > - Finally, the KIP still doesn't state how the handler will or won't > take > > > precedence over existing retry properties. If I set `retries` or ` > > > delivery.timeout.ms` or `max.block.ms` to low values, will that cause > > > retries to cease even if my custom handler would otherwise keep > returning > > > RETRY for an error? > > > > > > Cheers, > > > > > > Chris > > > > > > On Mon, May 13, 2024 at 11:02 AM Andrew Schofield < > > > andrew_schofi...@live.com> > > > wrote: > > > > > > > Hi Alieh, > > > > Just a few more comments on the KIP. It is looking much less risky > now > > > the > > > > scope > > > > is tighter. > > > > > > > > [AJS1] It would be nice to have default implementations of the handle > > > > methods > > > > so an implementor would not need to implement both themselves. > > > > > > > > [AJS2] Producer configurations which are class names usually end in > > > > “.class”. > > > > I suggest “custom.exception.handler.class”. > > > > > > > > [AJS3] If I implemented a handler, and I set a non-default value for > > one > > > > of the > > > > new configuations, what happens? I would expect that the handler > takes > > > > precedence. I wasn’t quite clear what “the control will follow the > > > handler > > > > instructions” meant. > > > > > > > > [AJS4] Because you now have an enum for the > > > > RecordTooLargeExceptionResponse, > > > > I don’t think you need to state in the comment for > > > > ProducerExceptionHandler that > > > > RETRY will be interpreted as FAIL. > > > > > > > > Thanks, > > > > Andrew > > > > > > > > > On 13 May 2024, at 14:53, Alieh Saeedi > <asae...@confluent.io.INVALID > > > > > > > wrote: > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > Thanks for the very interesting discussion during my PTO. > > > > > > > > > > > > > > > KIP updates and addressing concerns: > > > > > > > > > > > > > > > 1) Two handle() methods are defined in ProducerExceptionHandler for > > the > > > > two > > > > > exceptions with different input parameters so that we have > > > > > handle(RecordTooLargeException e, ProducerRecord record) and > > > > > handle(UnknownTopicOrPartitionException e, ProducerRecord record) > > > > > > > > > > > > > > > 2) The ProducerExceptionHandler extends `Closable` as well. > > > > > > > > > > > > > > > 3) The KIP suggests having two more configuration parameters with > > > boolean > > > > > values: > > > > > > > > > > - `drop.invalid.large.records` with a default value of `false` for > > > > > swallowing too large records. > > > > > > > > > > - `retry.unknown.topic.partition` with a default value of `true` > that > > > > > performs RETRY for `max.block.ms` ms, encountering the > > > > > UnknownTopicOrPartitionException. > > > > > > > > > > > > > > > Hope the main concerns are addressed so that we can go forward with > > > > voting. > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Alieh > > > > > > > > > > On Thu, May 9, 2024 at 11:25 PM Artem Livshits > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > >> Hi Mathias, > > > > >> > > > > >>> [AL1] While I see the point, I would think having a different > > > callback > > > > >> for every exception might not really be elegant? > > > > >> > > > > >> I'm not sure how to assess the level of elegance of the proposal, > > but > > > I > > > > can > > > > >> comment on the technical characteristics: > > > > >> > > > > >> 1. Having specific interfaces that codify the logic that is > > currently > > > > >> prescribed in the comments reduce the chance of making a mistake. > > > > >> Commments may get ignored, misuderstood or etc. but if the > contract > > is > > > > >> codified, the compilier will help to enforce the contract. > > > > >> 2. Given that the logic is trickier than it seems (the > > > record-too-large > > > > is > > > > >> an example that can easily confuse someone who's not intimately > > > familiar > > > > >> with the nuances of the batching logic), having a little more > hoops > > to > > > > jump > > > > >> would give a greater chance that whoever tries to add a new cases > > > pauses > > > > >> and thinks a bit more. > > > > >> 3. As Justine pointed out, having different method will be a > forcing > > > > >> function to go through a KIP rather than smuggle new cases through > > > > >> implementation. > > > > >> 4. Sort of a consequence of the previous 3 -- all those things > > reduce > > > > the > > > > >> chance of someone writing the code that works with 2 errors and > then > > > > when > > > > >> more errors are added in the future will suddenly incorrectly > ignore > > > new > > > > >> errors (the example I gave in the previous email). > > > > >> > > > > >>> [AL2 cont.] Similar to AL1, I see such a handler to some extend > as > > > > >> business logic. If a user puts a bad filter condition in their KS > > app, > > > > and > > > > >> drops messages > > > > >> > > > > >> I agree that there is always a chance to get a bug and lose > > messages, > > > > but > > > > >> there are generally separation of concerns that has different risk > > > > profile: > > > > >> the filtering logic may be more rigorously tested and rarely > changed > > > > (say > > > > >> an application developer does it), but setting the topics to > produce > > > > may be > > > > >> done via configuration (e.g. a user of the application does it) > and > > > it's > > > > >> generally an expectation that users would get an error when > > > > configuration > > > > >> is incorrect. > > > > >> > > > > >> What could be worse is that UnknownTopicOrPartitionException can > be > > an > > > > >> intermittent error, i.e. with a generally correct configuration, > > there > > > > >> could be metadata propagation problem on the cluster and then a > > random > > > > set > > > > >> of records could get lost. > > > > >> > > > > >>> [AL3] Maybe I misunderstand what you are saying, but to me, > > checking > > > > the > > > > >> size of the record upfront is exactly what the KIP proposes? No? > > > > >> > > > > >> It achieves the same result but solves it differently, my > proposal: > > > > >> > > > > >> 1. Application checks the validity of a record (maybe via a new > > > > >> validateRecord method) before producing it, and can just exclude > it > > or > > > > >> return an error to the user. > > > > >> 2. Application produces the record -- at this point there are no > > > records > > > > >> that could return record too large, they were either skipped at > > step 1 > > > > or > > > > >> we didn't get here because step 1 failed. > > > > >> > > > > >> Vs. KIP's proposal > > > > >> > > > > >> 1. Application produces the record. > > > > >> 2. Application gets a callback. > > > > >> 3. Application returns the action on how to proceed. > > > > >> > > > > >> The advantage of the former is the clarity of semantics -- the > > record > > > is > > > > >> invalid (property of the record, not a function of server state or > > > > server > > > > >> configuration) and we can clearly know that it is the record that > is > > > bad > > > > >> and can never succeed. > > > > >> > > > > >> The KIP-proposed way actually has a very tricky point: it actually > > > > handles > > > > >> a subset of record-too-large exceptions. The broker can return > > > > >> record-too-large and reject the whole batch (but we don't want to > > > ignore > > > > >> those because then we can skip random records that just happened > to > > be > > > > in > > > > >> the same batch), in some sense we use the same error for 2 > different > > > > >> conditions and understanding that requires pretty deep > understanding > > > of > > > > >> Kafka internals. > > > > >> > > > > >> -Artem > > > > >> > > > > >> > > > > >> On Wed, May 8, 2024 at 9:47 AM Justine Olshan > > > > <jols...@confluent.io.invalid > > > > >>> > > > > >> wrote: > > > > >> > > > > >>> My concern with respect to it being fragile: the code that > ensures > > > the > > > > >>> error type is internal to the producer. Someone may see it and > > say, I > > > > >> want > > > > >>> to add such and such error. This looks like internal code, so I > > don't > > > > >> need > > > > >>> a KIP, and then they can change it to whatever they want thinking > > it > > > is > > > > >>> within the typical kafka improvement protocol. > > > > >>> > > > > >>> Relying on an internal change to enforce an external API is > fragile > > > in > > > > my > > > > >>> opinion. That's why I sort of agreed with Artem with enforcing > the > > > > error > > > > >> in > > > > >>> the method signature -- part of the public API. > > > > >>> > > > > >>> Chris's comments on requiring more information to handler again > > makes > > > > me > > > > >>> wonder if we are solving a problem of lack of information at the > > > > >>> application level with a more powerful solution than we need. > (Ie, > > if > > > > we > > > > >>> had more information, could the application close and restart the > > > > >>> transaction rather than having to drop records) But I am happy to > > > > >>> compromise with a handler that we can agree is sufficiently > > > controlled > > > > >> and > > > > >>> documented. > > > > >>> > > > > >>> Justine > > > > >>> > > > > >>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton > > <chr...@aiven.io.invalid > > > > > > > > >>> wrote: > > > > >>> > > > > >>>> Hi Alieh, > > > > >>>> > > > > >>>> Continuing prior discussions: > > > > >>>> > > > > >>>> 1) Regarding the "flexibility" discussion, my overarching point > is > > > > >> that I > > > > >>>> don't see the point in allowing for this kind of pluggable logic > > > > >> without > > > > >>>> also covering more scenarios. Take example 2 in the KIP: if > we're > > > > going > > > > >>> to > > > > >>>> implement retries only on "important" topics when a topic > > partition > > > > >> isn't > > > > >>>> found, why wouldn't we also want to be able to do this for other > > > > >> errors? > > > > >>>> Again, taking authorization errors as an example, why wouldn't > we > > > want > > > > >> to > > > > >>>> be able to fail when we can't write to "important" topics > because > > > the > > > > >>>> producer principal lacks sufficient ACLs, and drop the record if > > the > > > > >>> topic > > > > >>>> isn't "important"? In a security-conscious environment with > > > > >>>> runtime-dependent topic routing (which is a common feature of > many > > > > >> source > > > > >>>> connectors, such as the Debezium connectors), this seems fairly > > > > likely. > > > > >>>> > > > > >>>> 2) As far as changing the shape of the API goes, I like Artem's > > idea > > > > of > > > > >>>> splitting out the interface based on specific exceptions. This > may > > > be > > > > a > > > > >>>> little laborious to expand in the future, but if we really want > to > > > > >>>> limit the exceptions that we cover with the handler and move > > slowly > > > > and > > > > >>>> cautiously, then IMO it'd be reasonable to reflect that in the > > > > >>> interface. I > > > > >>>> also acknowledge that there's no way to completely prevent > people > > > from > > > > >>>> shooting themselves in the foot by implementing the API > > incorrectly, > > > > >> but > > > > >>> I > > > > >>>> think it's worth it to do what we can--including leveraging the > > Java > > > > >>>> language's type system--to help them, so IMO there's value to > > > > >> eliminating > > > > >>>> the implicit behavior of failing when a policy returns RETRY > for a > > > > >>>> non-retriable error. This can take a variety of shapes and I'm > not > > > > >> going > > > > >>> to > > > > >>>> insist on anything specific, but I do want to again raise my > > > concerns > > > > >>> with > > > > >>>> the current proposal and request that we find something a little > > > > >> better. > > > > >>>> > > > > >>>> 3) Concerning the default implementation--actually, I meant > what I > > > > >> wrote > > > > >>> :) > > > > >>>> I don't want a "second" default, I want an implementation of > this > > > > >>> interface > > > > >>>> to be used as the default if no others are specified. The > behavior > > > of > > > > >>> this > > > > >>>> default implementation would be identical to existing behavior > (so > > > > >> there > > > > >>>> would be no backwards compatibility concerns like the ones > raised > > by > > > > >>>> Matthias), but it would be possible to configure this default > > > handler > > > > >>> class > > > > >>>> to behave differently for a basic set of scenarios. This would > > > mirror > > > > >>> (pun > > > > >>>> intended) the approach we've taken with Mirror Maker 2 and its > > > > >>>> ReplicationPolicy interface [1]. There is a default > implementation > > > > >>>> available [2] that recognizes a handful of basic configuration > > > > >> properties > > > > >>>> [3] for simple tweaks, but if users want, they can also > implement > > > > their > > > > >>> own > > > > >>>> replication policy for more fine-grained logic if those > properties > > > > >> aren't > > > > >>>> flexible enough. > > > > >>>> > > > > >>>> More concretely, I'm imagining something like this for the > > producer > > > > >>>> exception handler: > > > > >>>> > > > > >>>> - Default implementation class > > > > >>>> of > > org.apache.kafka.clients.producer.DefaultProducerExceptionHandler > > > > >>>> - This class would recognize two properties: > > > > >>>> - drop.invalid.large.records: Boolean property, defaults to > > false. > > > If > > > > >>>> "false", then causes the handler to return FAIL whenever > > > > >>>> a RecordTooLargeException is encountered; if "true", then causes > > > > >>>> SWALLOW/SKIP/DROP to be returned instead > > > > >>>> - unknown.topic.partition.retry.timeout.ms: Integer property, > > > > >> defaults > > > > >>>> to > > > > >>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is > > > encountered, > > > > >>>> causes the handler to return FAIL if that record has been > pending > > > for > > > > >>> more > > > > >>>> than the retry timeout; otherwise, causes RETRY to be returned > > > > >>>> > > > > >>>> I think this is worth addressing now instead of later because it > > > > forces > > > > >>> us > > > > >>>> to evaluate the usefulness of this interface and it addresses a > > > > >>>> long-standing issue not just with Kafka Connect, but with the > Java > > > > >>> producer > > > > >>>> in general. For reference, here are a few tickets I collected > > after > > > > >>> briefly > > > > >>>> skimming our Jira showing that this is a real pain point for > > users: > > > > >>>> https://issues.apache.org/jira/browse/KAFKA-10340, > > > > >>>> https://issues.apache.org/jira/browse/KAFKA-12990, > > > > >>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although > this > > is > > > > >>>> frequently reported with Kafka Connect, it applies to anyone who > > > > >>> configures > > > > >>>> a producer to use a high retry timeout. I am aware of the > > > > max.block.ms > > > > >>>> property, but it's painful and IMO poor behavior to require > users > > to > > > > >>> reduce > > > > >>>> the value of this property just to handle the single scenario > when > > > > >> trying > > > > >>>> to write to topics that don't exist, since it would also limit > the > > > > >> retry > > > > >>>> timeout for other operations that are legitimately retriable. > > > > >>>> > > > > >>>> Raising new points: > > > > >>>> > > > > >>>> 5) I don't see the interplay between this handler and existing > > > > >>>> retry-related properties mentioned anywhere in the KIP. I'm > > assuming > > > > >> that > > > > >>>> properties like "retries", "max.block.ms", and " > > delivery.timeout.ms > > > " > > > > >>> would > > > > >>>> take precedence over the handler and once they are exhausted, > the > > > > >>>> record/batch will fail no matter what? If so, it's probably > worth > > > > >> briefly > > > > >>>> mentioning this (no more than a sentence or two) in the KIP, and > > if > > > > >> not, > > > > >>>> I'm curious what you have in mind. > > > > >>>> > > > > >>>> 6) I also wonder if the API provides enough information in its > > > current > > > > >>>> form. Would it be possible to provide handlers with some way of > > > > >> tracking > > > > >>>> how long a record has been pending for (i.e., how long it's been > > > since > > > > >>> the > > > > >>>> record was provided as an argument to Producer::send)? One way > to > > do > > > > >> this > > > > >>>> could be to add a method like `onNewRecord(ProducerRecord)` and > > > > >>>> allow/require the handler to do its own bookkeeping, probably > > with a > > > > >>>> matching `onRecordSuccess(ProducerRecord)` method so that the > > > handler > > > > >>>> doesn't eat up an ever-increasing amount of memory trying to > track > > > > >>> records. > > > > >>>> An alternative could be to include information about the initial > > > time > > > > >> the > > > > >>>> record was received by the producer and the number of retries > that > > > > have > > > > >>>> been performed for it as parameters in the handle method(s), but > > I'm > > > > >> not > > > > >>>> sure how easy this would be to implement and it might clutter > > things > > > > >> up a > > > > >>>> bit too much. > > > > >>>> > > > > >>>> 7) A small request--can we add Closeable (or, if you prefer, > > > > >>> AutoCloseable) > > > > >>>> as a superinterface for the handler interface? > > > > >>>> > > > > >>>> [1] - > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html > > > > >>>> [2] - > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html > > > > >>>> [3] - > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG > > > > >>>> , > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG > > > > >>>> > > > > >>>> Cheers, > > > > >>>> > > > > >>>> Chris > > > > >>>> > > > > >>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax < > mj...@apache.org > > > > > > > >>> wrote: > > > > >>>> > > > > >>>>> Very interesting discussion. > > > > >>>>> > > > > >>>>> Seems a central point is the question about "how generic" we > > > approach > > > > >>>>> this, and some people think we need to be conservative and > others > > > > >> think > > > > >>>>> we should try to be as generic as possible. > > > > >>>>> > > > > >>>>> Personally, I think following a limited scope for this KIP (by > > > > >>>>> explicitly saying we only cover RecordTooLarge and > > > > >>>>> UnknownTopicOrPartition) might be better. We have concrete > > tickets > > > > >> that > > > > >>>>> we address, while for other exception (like authorization) we > > don't > > > > >>> know > > > > >>>>> if people want to handle it to begin with. Boiling the ocean > > might > > > > >> not > > > > >>>>> get us too far, and being somewhat pragmatic might help to move > > > this > > > > >>> KIP > > > > >>>>> forward. -- I also agree with Justin and Artem, that we want to > > be > > > > >>>>> careful anyway to not allow users to break stuff too easily. > > > > >>>>> > > > > >>>>> As the same time, I agree that we should setup this change in a > > > > >> forward > > > > >>>>> looking way, and thus having a single generic handler allows us > > to > > > > >>> later > > > > >>>>> extend the handler more easily. This should also simplify > follow > > up > > > > >> KIP > > > > >>>>> that might add new error cases (I actually mentioned one more > to > > > > >> Alieh > > > > >>>>> already, but we both agreed that it might be best to exclude it > > > from > > > > >>> the > > > > >>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP > is > > > not > > > > >>>>> the end of the world.) > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> @Chris: > > > > >>>>> > > > > >>>>> (2) This sounds fair to me, but not sure how "bad" it actually > > > would > > > > >>> be? > > > > >>>>> If the contract is clearly defined, it seems to be fine what > the > > > KIP > > > > >>>>> proposes, and given that such a handler is an expert API, and > we > > > can > > > > >>>>> provide "best practices" (cf my other comment below in [AL1]), > > > being > > > > >> a > > > > >>>>> little bit pragmatic sound fine to me. > > > > >>>>> > > > > >>>>> Not sure if I understand Justin's argument on this question? > > > > >>>>> > > > > >>>>> > > > > >>>>> (3) About having a default impl or not. I am fine with adding > > one, > > > > >> even > > > > >>>>> if I am not sure why Connect could not just add its own one and > > > plug > > > > >> it > > > > >>>>> in (and we would add corresponding configs for Connect, but not > > for > > > > >> the > > > > >>>>> producer itself)? For this case, we could also do this as a > > follow > > > up > > > > >>>>> KIP, but happy to include it in this KIP to provide value to > > > Connect > > > > >>>>> right away (even if the value might not come right away if we > > miss > > > > >> the > > > > >>>>> 3.8 deadline due to expanded KIP scope...) -- For KS, we would > > for > > > > >>> sure > > > > >>>>> plugin our own impl, and lock down the config such that users > > > cannot > > > > >>> set > > > > >>>>> their own handler on the internal producer to begin with. Might > > be > > > > >> good > > > > >>>>> to elaborate why the producer should have a default? We might > > > > >> actually > > > > >>>>> want to add this to the KIP right away? > > > > >>>>> > > > > >>>>> The key for a default impl would be, to not change the current > > > > >>> behavior, > > > > >>>>> and having no default seems to achieve this. For the two cases > > you > > > > >>>>> mentioned, it's unclear to me what default value on "upper > bound > > on > > > > >>>>> retires" for UnkownTopicOrPartitionException we should set? > Seems > > > it > > > > >>>>> would need to be the same as `delivery.timeout.ms`? However, > if > > > > >> users > > > > >>>>> have `delivery.timeout.ms` actually overwritten we would need > to > > > set > > > > >>>>> this config somewhat "dynamic"? Is this feasible? If we > > hard-code 2 > > > > >>>>> minutes, it might not be backward compatible. I have the > > impression > > > > >> we > > > > >>>>> might introduce some undesired coupling? -- For the "record too > > > > >> large" > > > > >>>>> case, the config seems to be boolean and setting it to `false` > by > > > > >>>>> default seems to provide backward compatibility. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> @Artem: > > > > >>>>> > > > > >>>>> [AL1] While I see the point, I would think having a different > > > > >> callback > > > > >>>>> for every exception might not really be elegant? In the end, > the > > > > >>> handler > > > > >>>>> is an very advanced feature anyway, and if it's implemented in > a > > > bad > > > > >>>>> way, well, it's a user error -- we cannot protect users from > > > > >>> everything. > > > > >>>>> To me, a handler like this, is to some extend "business logic" > > and > > > > >> if a > > > > >>>>> user gets business logic wrong, it's hard to protect them. -- > We > > > > >> would > > > > >>>>> of course provide best practice guidance in the JaveDocs, and > > > explain > > > > >>>>> that a handler should have explicit `if` statements for stuff > it > > > want > > > > >>> to > > > > >>>>> handle, and only a single default which return FAIL. > > > > >>>>> > > > > >>>>> > > > > >>>>> [AL2] Yes, but for KS we would retry at the application layer. > > Ie, > > > > >> the > > > > >>>>> TX is not completed, we clean up and setup out task from > scratch, > > > to > > > > >>>>> ensure the pending transaction is completed before we resume. > If > > > the > > > > >> TX > > > > >>>>> was indeed aborted, we would retry from older offset and thus > > just > > > > >> hit > > > > >>>>> the same error again and the loop begins again. > > > > >>>>> > > > > >>>>> > > > > >>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend > > as > > > > >>>>> business logic. If a user puts a bad filter condition in their > KS > > > > >> app, > > > > >>>>> and drops messages, it nothing we can do about it, and this > > handler > > > > >>>>> IMHO, has a similar purpose. This is also the line of thinking > I > > > > >> apply > > > > >>>>> to EOS, to address Justin's concern about "should we allow to > > drop > > > > >> for > > > > >>>>> EOS", and my answer is "yes", because it's more business logic > > than > > > > >>>>> actual error handling IMHO. And by default, we fail... So users > > > > >> opt-in > > > > >>>>> to add business logic to drop records. It's an application > level > > > > >>>>> decision how to write the code. > > > > >>>>> > > > > >>>>> > > > > >>>>> [AL3] Maybe I misunderstand what you are saying, but to me, > > > checking > > > > >>> the > > > > >>>>> size of the record upfront is exactly what the KIP proposes? > No? > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> @Justin: > > > > >>>>> > > > > >>>>>> I saw the sample > > > > >>>>>> code -- is it just an if statement checking for the error > before > > > > >> the > > > > >>>>>> handler is invoked? That seems a bit fragile. > > > > >>>>> > > > > >>>>> What do you mean by fragile? Not sure if I see your point. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> -Matthias > > > > >>>>> > > > > >>>>> On 5/7/24 5:33 PM, Artem Livshits wrote: > > > > >>>>>> Hi Alieh, > > > > >>>>>> > > > > >>>>>> Thanks for the KIP. The motivation talks about very specific > > > > >> cases, > > > > >>>> but > > > > >>>>>> the interface is generic. > > > > >>>>>> > > > > >>>>>> [AL1] > > > > >>>>>> If the interface evolves in the future I think we could have > the > > > > >>>>> following > > > > >>>>>> confusion: > > > > >>>>>> > > > > >>>>>> 1. A user implemented SWALLOW action for both > > > > >> RecordTooLargeException > > > > >>>> and > > > > >>>>>> UnknownTopicOrPartitionException. For simpicity they just > > return > > > > >>>> SWALLOW > > > > >>>>>> from the function, because it elegantly handles all known > cases. > > > > >>>>>> 2. The interface has evolved to support a new exception. > > > > >>>>>> 3. The user has upgraded their Kafka client. > > > > >>>>>> > > > > >>>>>> Now a new kind of error gets dropped on the floor without > user's > > > > >>>>> intention > > > > >>>>>> and it would be super hard to detect and debug. > > > > >>>>>> > > > > >>>>>> To avoid the confusion, I think we should use handlers for > > > specific > > > > >>>>>> exceptions. Then if a new exception is added it won't get > > > silently > > > > >>>>> swalled > > > > >>>>>> because the user would need to add new functionality to handle > > it. > > > > >>>>>> > > > > >>>>>> I also have some higher level comments: > > > > >>>>>> > > > > >>>>>> [AL2] > > > > >>>>>>> it throws a TimeoutException, and the user can only blindly > > > retry, > > > > >>>> which > > > > >>>>>> may result in an infinite retry loop > > > > >>>>>> > > > > >>>>>> If the TimeoutException happens during transactional > processing > > > > >>>> (exactly > > > > >>>>>> once is the desired sematnics), then the client should not > retry > > > > >> when > > > > >>>> it > > > > >>>>>> gets TimeoutException because without knowing the reason for > > > > >>>>>> TimeoutExceptions, the client cannot know whether the message > > got > > > > >>>>> actually > > > > >>>>>> produced or not and retrying the message may result in > > > duplicatees. > > > > >>>>>> > > > > >>>>>>> The thrown TimeoutException "cuts" the connection to the > > > > >> underlying > > > > >>>> root > > > > >>>>>> cause of missing metadata > > > > >>>>>> > > > > >>>>>> Maybe we should fix the error handling and return the proper > > > > >>> underlying > > > > >>>>>> message? Then the application can properly handle the message > > > > >> based > > > > >>> on > > > > >>>>>> preferences. > > > > >>>>>> > > > > >>>>>> From the product perspective, it's not clear how safe it is to > > > > >>> blindly > > > > >>>>>> ignore UnknownTopicOrPartitionException. This could lead to > > > > >>> situations > > > > >>>>>> when a simple typo could lead to massive data loss (part of > the > > > > >> data > > > > >>>>> would > > > > >>>>>> effectively be produced to a "black hole" and the user may not > > > > >> notice > > > > >>>> it > > > > >>>>>> for a while). > > > > >>>>>> > > > > >>>>>> In which situations would you recommend the user to "black > hole" > > > > >>>> messages > > > > >>>>>> in case of misconfiguration? > > > > >>>>>> > > > > >>>>>> [AL3] > > > > >>>>>> > > > > >>>>>>> If the custom handler decides on SWALLOW for > > > > >>> RecordTooLargeException, > > > > >>>>>> > > > > >>>>>> Is it my understanding that this KIP proposes that > functionality > > > > >> that > > > > >>>>> would > > > > >>>>>> only be able to SWALLOW RecordTooLargeException that happen > > > because > > > > >>> the > > > > >>>>>> producer cannot produce the record (if the broker rejects the > > > > >> batch, > > > > >>>> the > > > > >>>>>> error won't get to the handler, because we cannot know which > > other > > > > >>>>> records > > > > >>>>>> get ignored). In this case, why not just check the locally > > > > >>> configured > > > > >>>>> max > > > > >>>>>> record size upfront and not produce the recrord in the first > > > place? > > > > >>>>> Maybe > > > > >>>>>> we can expose a validation function from the producer that > could > > > > >>>> validate > > > > >>>>>> the records locally, so we don't need to produce the record in > > > > >> order > > > > >>> to > > > > >>>>>> know that it's invalid. > > > > >>>>>> > > > > >>>>>> -Artem > > > > >>>>>> > > > > >>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan > > > > >>>>> <jols...@confluent.io.invalid> > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Alieh and Chris, > > > > >>>>>>> > > > > >>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I > > just > > > > >>>> didn't > > > > >>>>>>> understand how that would be ensured on the producer side. I > > saw > > > > >> the > > > > >>>>> sample > > > > >>>>>>> code -- is it just an if statement checking for the error > > before > > > > >> the > > > > >>>>>>> handler is invoked? That seems a bit fragile. > > > > >>>>>>> > > > > >>>>>>> Can you clarify what you mean by `since the code does not > reach > > > > >> the > > > > >>> KS > > > > >>>>>>> interface and breaks somewhere in producer.` If we surfaced > > this > > > > >>> error > > > > >>>>> to > > > > >>>>>>> the application in a better way would that also be a solution > > to > > > > >> the > > > > >>>>> issue? > > > > >>>>>>> > > > > >>>>>>> Justine > > > > >>>>>>> > > > > >>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi > > > > >>>>> <asae...@confluent.io.invalid> > > > > >>>>>>> wrote: > > > > >>>>>>> > > > > >>>>>>>> Hi, > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Thank you, Chris and Justine, for the feedback. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> @Chris > > > > >>>>>>>> > > > > >>>>>>>> 1) Flexibility: it has two meanings. The first meaning is > the > > > one > > > > >>> you > > > > >>>>>>>> mentioned. We are going to cover more exceptions in the > > future, > > > > >> but > > > > >>>> as > > > > >>>>>>>> Justine mentioned, we must be very conservative about adding > > > more > > > > >>>>>>>> exceptions. Additionally, flexibility mainly means that the > > user > > > > >> is > > > > >>>>> able > > > > >>>>>>> to > > > > >>>>>>>> develop their own code. As mentioned in the motivation > section > > > > >> and > > > > >>>> the > > > > >>>>>>>> examples, sometimes the user decides on dropping a record > > based > > > > >> on > > > > >>>> the > > > > >>>>>>>> topic, for example. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> 2) Defining two separate methods for retriable and > > non-retriable > > > > >>>>>>>> exceptions: although the idea is brilliant, the user may > still > > > > >>> make a > > > > >>>>>>>> mistake by implementing the wrong method and see a > > non-expecting > > > > >>>>>>> behaviour. > > > > >>>>>>>> For example, he may implement handleRetriable() for > > > > >>>>>>> RecordTooLargeException > > > > >>>>>>>> and define SWALLOW for the exception, but in practice, he > sees > > > no > > > > >>>>> change > > > > >>>>>>> in > > > > >>>>>>>> default behaviour since he implemented the wrong method. I > > think > > > > >> we > > > > >>>> can > > > > >>>>>>>> never reduce the user’s mistakes to 0. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> 3) Default implementation for Handler: the default behaviour > > is > > > > >>>> already > > > > >>>>>>>> preserved with NO need of implementing any handler or > setting > > > the > > > > >>>>>>>> corresponding config parameter `custom.exception.handler`. > > What > > > > >> you > > > > >>>>> mean > > > > >>>>>>> is > > > > >>>>>>>> actually having a second default, which requires having both > > > > >>>> interface > > > > >>>>>>> and > > > > >>>>>>>> config parameters. About UnknownTopicOrPartitionException: > the > > > > >>>> producer > > > > >>>>>>>> already offers the config parameter `max.block.ms` which > > > > >>> determines > > > > >>>>> the > > > > >>>>>>>> duration of retrying. The main purpose of the user who needs > > the > > > > >>>>> handler > > > > >>>>>>> is > > > > >>>>>>>> to get the root cause of TimeoutException and handle it in > the > > > > >> way > > > > >>> he > > > > >>>>>>>> intends. The KIP explains the necessity of it for KS users. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the > > > error, > > > > >>>> while > > > > >>>>>>>> SKIP means skip the record, I think. If it makes sense for > > more > > > > >>> ppl, > > > > >>>> I > > > > >>>>>>> can > > > > >>>>>>>> change it to SKIP > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> @Justine > > > > >>>>>>>> > > > > >>>>>>>> 1) was addressed by Chris. > > > > >>>>>>>> > > > > >>>>>>>> 2 and 3) The problem is exactly what you mentioned. > Currently, > > > > >>> there > > > > >>>> is > > > > >>>>>>> no > > > > >>>>>>>> way to handle these issues application-side. Even KS users > who > > > > >>>>> implement > > > > >>>>>>> KS > > > > >>>>>>>> ProductionExceptionHandler are not able to handle the > > exceptions > > > > >> as > > > > >>>>> they > > > > >>>>>>>> intend since the code does not reach the KS interface and > > breaks > > > > >>>>>>> somewhere > > > > >>>>>>>> in producer. > > > > >>>>>>>> > > > > >>>>>>>> Cheers, > > > > >>>>>>>> Alieh > > > > >>>>>>>> > > > > >>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton < > > > > >>>> fearthecel...@gmail.com> > > > > >>>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>>>>>> Hi Justine, > > > > >>>>>>>>> > > > > >>>>>>>>> The method signatures for the interface are indeed > > open-ended, > > > > >> but > > > > >>>> the > > > > >>>>>>>> KIP > > > > >>>>>>>>> states that its uses will be limited. See the motivation > > > > >> section: > > > > >>>>>>>>> > > > > >>>>>>>>>> We believe that the user should be able to develop custom > > > > >>> exception > > > > >>>>>>>>> handlers for managing producer exceptions. On the other > hand, > > > > >> this > > > > >>>>> will > > > > >>>>>>>> be > > > > >>>>>>>>> an expert-level API, and using that may result in strange > > > > >>> behaviour > > > > >>>> in > > > > >>>>>>>> the > > > > >>>>>>>>> system, making it hard to find the root cause. Therefore, > the > > > > >>> custom > > > > >>>>>>>>> handler is currently limited to handling > > > RecordTooLargeException > > > > >>> and > > > > >>>>>>>>> UnknownTopicOrPartitionException. > > > > >>>>>>>>> > > > > >>>>>>>>> Cheers, > > > > >>>>>>>>> > > > > >>>>>>>>> Chris > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan > > > > >>>>> <jols...@confluent.io.invalid > > > > >>>>>>>> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Hi Alieh, > > > > >>>>>>>>>> > > > > >>>>>>>>>> I was out for KSB and then was also sick. :( > > > > >>>>>>>>>> > > > > >>>>>>>>>> To your point 1) Chris, I don't think it is limited to two > > > > >>> specific > > > > >>>>>>>>>> scenarios, since the interface accepts a generic > Exception e > > > > >> and > > > > >>>> can > > > > >>>>>>> be > > > > >>>>>>>>>> implemented to check if that e is an instanceof any > > exception. > > > > >> I > > > > >>>>>>> didn't > > > > >>>>>>>>> see > > > > >>>>>>>>>> anywhere that specific errors are enforced. I'm a bit > > > concerned > > > > >>>> about > > > > >>>>>>>>> this > > > > >>>>>>>>>> actually. I'm concerned about the opened-endedness and the > > > > >>> contract > > > > >>>>>>> we > > > > >>>>>>>>> have > > > > >>>>>>>>>> with transactions. We are allowing the client to make > > > decisions > > > > >>>> that > > > > >>>>>>>> are > > > > >>>>>>>>>> somewhat invisible to the server. As an aside, can we > build > > in > > > > >>> log > > > > >>>>>>>>> messages > > > > >>>>>>>>>> when the handler decides to skip etc a message. I'm really > > > > >>>> concerned > > > > >>>>>>>>> about > > > > >>>>>>>>>> messages being silently dropped. > > > > >>>>>>>>>> > > > > >>>>>>>>>> I do think Chris's point 2) about retriable vs non > retriable > > > > >>> errors > > > > >>>>>>> is > > > > >>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic > or > > > > >>>> partition > > > > >>>>>>>>>> exception too early, as there are cases where it can be > > > > >>> transient. > > > > >>>>>>>>>> > > > > >>>>>>>>>> I'm still a little bit wary of allowing dropping records > as > > > > >> part > > > > >>> of > > > > >>>>>>> EOS > > > > >>>>>>>>>> generally as in many cases, these errors signify an issue > > with > > > > >>> the > > > > >>>>>>>>> original > > > > >>>>>>>>>> data. I understand that streams and connect/mirror maker > may > > > > >> have > > > > >>>>>>>> reasons > > > > >>>>>>>>>> they want to progress past these messages, but wondering > if > > > > >> there > > > > >>>> is > > > > >>>>>>> a > > > > >>>>>>>>> way > > > > >>>>>>>>>> that can be done application-side. I'm willing to accept > > this > > > > >>> sort > > > > >>>> of > > > > >>>>>>>>>> proposal if we can make it clear that this sort of thing > is > > > > >>>> happening > > > > >>>>>>>> and > > > > >>>>>>>>>> we limit the blast radius for what we can do. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Justine > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton > > > > >>>> <chr...@aiven.io.invalid > > > > >>>>>>>> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> Hi Alieh, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Sorry for the delay, I've been out sick. I still have > some > > > > >>>> thoughts > > > > >>>>>>>>> that > > > > >>>>>>>>>>> I'd like to see addressed before voting. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 1) If flexibility is the motivation for a pluggable > > > interface, > > > > >>> why > > > > >>>>>>>> are > > > > >>>>>>>>> we > > > > >>>>>>>>>>> only limiting the uses for this interface to two very > > > specific > > > > >>>>>>>>> scenarios? > > > > >>>>>>>>>>> Why not also allow, e.g., authorization errors to be > > handled > > > > >> as > > > > >>>>>>> well > > > > >>>>>>>>>>> (allowing users to drop records destined for some > > off-limits > > > > >>>>>>> topics, > > > > >>>>>>>> or > > > > >>>>>>>>>>> retry for a limited duration in case there's a delay in > the > > > > >>>>>>>> propagation > > > > >>>>>>>>>> of > > > > >>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other > > > > >> errors > > > > >>>>>>> that > > > > >>>>>>>>>> could > > > > >>>>>>>>>>> be handled with this new API, both to avoid the follow-up > > > work > > > > >>> of > > > > >>>>>>>>> another > > > > >>>>>>>>>>> KIP to address them in the future, and to make sure that > > > we're > > > > >>> not > > > > >>>>>>>>>> painting > > > > >>>>>>>>>>> ourselves into a corner with the current API in a way > that > > > > >> would > > > > >>>>>>> make > > > > >>>>>>>>>>> future modifications difficult. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 2) Something feels a bit off with how retriable vs. > > > > >>> non-retriable > > > > >>>>>>>>> errors > > > > >>>>>>>>>>> are handled with the interface. Why not introduce two > > > separate > > > > >>>>>>>> methods > > > > >>>>>>>>> to > > > > >>>>>>>>>>> handle each case separately? That way there's no > ambiguity > > or > > > > >>>>>>>> implicit > > > > >>>>>>>>>>> behavior when, e.g., attempting to retry on a > > > > >>>>>>>> RecordTooLargeException. > > > > >>>>>>>>>> This > > > > >>>>>>>>>>> could be something like `NonRetriableResponse > > > > >>>>>>> handle(ProducerRecord, > > > > >>>>>>>>>>> Exception)` and `RetriableResponse > > > > >>> handleRetriable(ProducerRecord, > > > > >>>>>>>>>>> Exception)`, though the exact names and shape can > obviously > > > be > > > > >>>>>>> toyed > > > > >>>>>>>>>> with a > > > > >>>>>>>>>>> bit. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 3) Although the flexibility of a pluggable interface may > > > > >> benefit > > > > >>>>>>> some > > > > >>>>>>>>>>> users' custom producer applications and Kafka Streams > > > > >>>> applications, > > > > >>>>>>>> it > > > > >>>>>>>>>>> comes at significant deployment cost for other > low-/no-code > > > > >>>>>>>>> environments, > > > > >>>>>>>>>>> including but not limited to Kafka Connect and > MirrorMaker > > 2. > > > > >>> Can > > > > >>>>>>> we > > > > >>>>>>>>> add > > > > >>>>>>>>>> a > > > > >>>>>>>>>>> default implementation of the exception handler that > allows > > > > >> for > > > > >>>>>>> some > > > > >>>>>>>>>> simple > > > > >>>>>>>>>>> behavior to be tweaked via configuration property? Two > > things > > > > >>> that > > > > >>>>>>>>> would > > > > >>>>>>>>>> be > > > > >>>>>>>>>>> nice to have would be A) an upper bound on the retry time > > for > > > > >>>>>>>>>>> unknown-topic-partition exceptions and B) an option to > drop > > > > >>>> records > > > > >>>>>>>>> that > > > > >>>>>>>>>>> are large enough to trigger a record-too-large exception. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of > the > > > > >>>> proposed > > > > >>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious, > > > > >>> especially > > > > >>>>>>>> when > > > > >>>>>>>>>>> trying to guess the behavior for retriable errors. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Cheers, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Chris > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi > > > > >>>>>>>>>> <asae...@confluent.io.invalid > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Hi all, > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> A summary of the KIP and the discussions: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> The KIP introduces a handler interface for Producer in > > order > > > > >> to > > > > >>>>>>>>> handle > > > > >>>>>>>>>>> two > > > > >>>>>>>>>>>> exceptions: RecordTooLargeException and > > > > >>>>>>>>>> UnknownTopicOrPartitionException. > > > > >>>>>>>>>>>> The handler handles the exceptions per-record. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> - Do we need this handler? [Motivation and Examples > > > > >> sections] > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the > producer > > > > >>>>>>> collects > > > > >>>>>>>>>>> multiple > > > > >>>>>>>>>>>> records in batches. Then a RecordTooLargeException > related > > > > >> to a > > > > >>>>>>>>> single > > > > >>>>>>>>>>>> record leads to failing the entire batch. A custom > > exception > > > > >>>>>>>> handler > > > > >>>>>>>>> in > > > > >>>>>>>>>>>> this case may decide on dropping the record and > continuing > > > > >> the > > > > >>>>>>>>>>> processing. > > > > >>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a > > > > >> record > > > > >>>>>>>> that > > > > >>>>>>>>> is > > > > >>>>>>>>>>> too > > > > >>>>>>>>>>>> large is a poison pill record, and there is no way to > skip > > > > >> over > > > > >>>>>>>> it. A > > > > >>>>>>>>>>>> handler would allow us to react to this error inside the > > > > >>>>>>> producer, > > > > >>>>>>>>>> i.e., > > > > >>>>>>>>>>>> local to where the error happens, and thus simplify the > > > > >> overall > > > > >>>>>>>> code > > > > >>>>>>>>>>>> significantly. Please read the Motivation section for > more > > > > >>>>>>>>> explanation. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the > > > producer > > > > >>>>>>>> handles > > > > >>>>>>>>>>> this > > > > >>>>>>>>>>>> exception internally and only issues a WARN log about > > > missing > > > > >>>>>>>>> metadata > > > > >>>>>>>>>>> and > > > > >>>>>>>>>>>> retries internally. Later, when the producer hits " > > > > >>>>>>>>> deliver.timeout.ms" > > > > >>>>>>>>>>> it > > > > >>>>>>>>>>>> throws a TimeoutException, and the user can only blindly > > > > >> retry, > > > > >>>>>>>> which > > > > >>>>>>>>>> may > > > > >>>>>>>>>>>> result in an infinite retry loop. The thrown > > > TimeoutException > > > > >>>>>>>> "cuts" > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>> connection to the underlying root cause of missing > > metadata > > > > >>>>>>> (which > > > > >>>>>>>>>> could > > > > >>>>>>>>>>>> indeed be a transient error but is persistent for a > > > > >>> non-existing > > > > >>>>>>>>>> topic). > > > > >>>>>>>>>>>> Thus, there is no programmatic way to break the infinite > > > > >> retry > > > > >>>>>>>> loop. > > > > >>>>>>>>>>> Kafka > > > > >>>>>>>>>>>> Streams also blindly retries for this case, and the > > > > >> application > > > > >>>>>>>> gets > > > > >>>>>>>>>>> stuck. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> - Having interface vs configuration option: [Motivation, > > > > >>>>>>> Examples, > > > > >>>>>>>>> and > > > > >>>>>>>>>>>> Rejected Alternatives sections] > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Our solution is introducing an interface due to the full > > > > >>>>>>>> flexibility > > > > >>>>>>>>>> that > > > > >>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams > ones, > > > > >>>>>>>> determine > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>> handler's behaviour based on the situation. For > example, f > > > > >>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may > > want > > > > >> to > > > > >>>>>>>> raise > > > > >>>>>>>>> an > > > > >>>>>>>>>>>> error for some topics but retry it for other topics. > > Having > > > a > > > > >>>>>>>>>>> configuration > > > > >>>>>>>>>>>> option with a fixed set of possibilities does not serve > > the > > > > >>>>>>> user's > > > > >>>>>>>>>>>> needs. See Example 2, please. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces > > > > >> section] > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> If the custom handler decides on SWALLOW for > > > > >>>>>>>> RecordTooLargeException, > > > > >>>>>>>>>>> then > > > > >>>>>>>>>>>> this record will not be a part of the batch of > > transactions > > > > >> and > > > > >>>>>>>> will > > > > >>>>>>>>>> also > > > > >>>>>>>>>>>> not be sent to the broker in non-transactional mode. So > no > > > > >>>>>>> worries > > > > >>>>>>>>>> about > > > > >>>>>>>>>>>> getting a RecordTooLargeException from the broker in > this > > > > >> case, > > > > >>>>>>> as > > > > >>>>>>>>> the > > > > >>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW > > means > > > > >>> drop > > > > >>>>>>>> the > > > > >>>>>>>>>>> record > > > > >>>>>>>>>>>> and continue/swallow the error. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> - What if the handle() method implements RETRY for > > > > >>>>>>>>>>> RecordTooLargeException? > > > > >>>>>>>>>>>> [Proposed Changes section] > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW > for > > > > >>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal > to > > > > >> FAIL. > > > > >>>>>>>> This > > > > >>>>>>>>> is > > > > >>>>>>>>>>>> well documented/informed in javadoc. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> - What if the handle() method of the handler throws an > > > > >>> exception? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> The handler is expected to have correct code. If it > throws > > > an > > > > >>>>>>>>>> exception, > > > > >>>>>>>>>>>> everything fails. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> This is a PoC PR < > > > https://github.com/apache/kafka/pull/15846 > > > > >>> > > > > >>>>>>> ONLY > > > > >>>>>>>>> for > > > > >>>>>>>>>>>> RecordTooLargeException. The code changes related to > > > > >>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this > PR > > > > >>> LATER. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Looking forward to your feedback again. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Cheers, > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Alieh > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True < > > > > >> k...@kirktrue.pro> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Hi Alieh, > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks for the updates! > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Comments inline... > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi > > > > >>>>>>>>>>> <asae...@confluent.io.INVALID > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Hi all, > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks! > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Addressing some of the main concerns: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by > broker, > > > > >>>>>>>> producer > > > > >>>>>>>>>> and > > > > >>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler` > > > > >> interface > > > > >>>>>>>> is > > > > >>>>>>>>>>>>> introduced > > > > >>>>>>>>>>>>>> to affect only the exceptions thrown from the > producer. > > > > >> This > > > > >>>>>>>> KIP > > > > >>>>>>>>>> very > > > > >>>>>>>>>>>>>> specifically means to provide a possibility to manage > > the > > > > >>>>>>>>>>>>>> `RecordTooLargeException` thrown from the > > Producer.send() > > > > >>>>>>>> method. > > > > >>>>>>>>>>>> Please > > > > >>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I > > > > >>>>>>> investigated > > > > >>>>>>>>> the > > > > >>>>>>>>>>>> issue > > > > >>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern > > about > > > > >> how > > > > >>>>>>>> we > > > > >>>>>>>>>>> handle > > > > >>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> errors as well. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are > > > called > > > > >>>>>>>> when > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>> record > > > > >>>>>>>>>>>>>> sent to the server is acknowledged, while this is not > > the > > > > >>>>>>>> desired > > > > >>>>>>>>>>> time > > > > >>>>>>>>>>>>> for > > > > >>>>>>>>>>>>>> all exceptions. We intend to handle exceptions > > beforehand. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> I guess it makes sense to keep the expectation for when > > > > >>>>>>> Callback > > > > >>>>>>>> is > > > > >>>>>>>>>>>>> invoked as-is vs. shoehorning more into it. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> - What if the custom handler returns RETRY for > > > > >>>>>>>>>>>>> `RecordTooLargeException`? I > > > > >>>>>>>>>>>>>> assume changing the producer configuration at runtime > is > > > > >>>>>>>>> possible. > > > > >>>>>>>>>> If > > > > >>>>>>>>>>>> so, > > > > >>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in > > the > > > > >>>>>>> next > > > > >>>>>>>>>> try, > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> too large record is not poisoning any more. I am not > > 100% > > > > >>>>>>> sure > > > > >>>>>>>>>> about > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> technical details, though. Otherwise, we can consider > > the > > > > >>>>>>> RETRY > > > > >>>>>>>>> as > > > > >>>>>>>>>>> FAIL > > > > >>>>>>>>>>>>> for > > > > >>>>>>>>>>>>>> this exception. Another solution would be to consider > a > > > > >>>>>>>> constant > > > > >>>>>>>>>>> number > > > > >>>>>>>>>>>>> of > > > > >>>>>>>>>>>>>> times for RETRY which can be useful for other > exceptions > > > as > > > > >>>>>>>> well. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> It’s not presently possible to change the configuration > > of > > > > >> an > > > > >>>>>>>>>> existing > > > > >>>>>>>>>>>>> Producer at runtime. So if a record hits a > > > > >>>>>>>> RecordTooLargeException > > > > >>>>>>>>>>> once, > > > > >>>>>>>>>>>> no > > > > >>>>>>>>>>>>> amount of retrying (with the current Producer) will > > change > > > > >>> that > > > > >>>>>>>>> fact. > > > > >>>>>>>>>>> So > > > > >>>>>>>>>>>>> I’m still a little stuck on how to handle a response of > > > > >> RETRY > > > > >>>>>>> for > > > > >>>>>>>>> an > > > > >>>>>>>>>>>>> “oversized” record. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> - What if the handle() method itself throws an > > exception? > > > I > > > > >>>>>>>> think > > > > >>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be > > > exactly > > > > >>>>>>>> like > > > > >>>>>>>>>> when > > > > >>>>>>>>>>>> no > > > > >>>>>>>>>>>>>> custom handler is defined since the user actually did > > not > > > > >>>>>>> have > > > > >>>>>>>> a > > > > >>>>>>>>>>>> working > > > > >>>>>>>>>>>>>> handler. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is > the > > > > >> right > > > > >>>>>>>>>> choice. > > > > >>>>>>>>>>> It > > > > >>>>>>>>>>>>> then becomes a silent failure that might have > > > repercussions, > > > > >>>>>>>>>> depending > > > > >>>>>>>>>>> on > > > > >>>>>>>>>>>>> the business logic. A user would have to proactively > > trawls > > > > >>>>>>>> through > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>> logs for WARN/ERROR messages to catch it. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Throwing a hard error is pretty draconian, though… > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> - Why not use config parameters instead of an > interface? > > > As > > > > >>>>>>>>>> explained > > > > >>>>>>>>>>>> in > > > > >>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that > the > > > > >>>>>>> handler > > > > >>>>>>>>>> will > > > > >>>>>>>>>>> be > > > > >>>>>>>>>>>>>> used for a greater number of exceptions in the future. > > > > >>>>>>>> Defining a > > > > >>>>>>>>>>>>>> configuration parameter for each exception may make > the > > > > >>>>>>>>>>> configuration a > > > > >>>>>>>>>>>>> bit > > > > >>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Agreed that the logic-via-configuration approach is > weird > > > > >> and > > > > >>>>>>>>>> limiting. > > > > >>>>>>>>>>>>> Forget I ever suggested it ;) > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> I’d think additional background in the Motivation > section > > > > >>> would > > > > >>>>>>>>> help > > > > >>>>>>>>>> me > > > > >>>>>>>>>>>>> understand how users might use this feature beyond a) > > > > >> skipping > > > > >>>>>>>>>>>> “oversized” > > > > >>>>>>>>>>>>> records, and b) not retrying missing topics. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Small change: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for > > > brevity > > > > >>>>>>> and > > > > >>>>>>>>>>>>> simplicity. > > > > >>>>>>>>>>>>>> Could’ve been HandlerResponse too I think! > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> The name change sounds good to me. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks Alieh! > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> I thank you all again for your useful > > > > >> questions/suggestions. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> I would be happy to hear more of your concerns, as > > stated > > > > >> in > > > > >>>>>>>> some > > > > >>>>>>>>>>>>> feedback. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Cheers, > > > > >>>>>>>>>>>>>> Alieh > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan > > > > >>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Thanks Alieh for the updates. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> I'm a little concerned about the design pattern here. > > It > > > > >>>>>>> seems > > > > >>>>>>>>>> like > > > > >>>>>>>>>>> we > > > > >>>>>>>>>>>>> want > > > > >>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic > > > > >>>>>>> handler. > > > > >>>>>>>>>>>>>>> I think we tried to narrow down on the specific > errors > > we > > > > >>>>>>> want > > > > >>>>>>>>> to > > > > >>>>>>>>>>>>> handle, > > > > >>>>>>>>>>>>>>> but it feels a little clunky as we have a generic > thing > > > > >> for > > > > >>>>>>>> two > > > > >>>>>>>>>>>> specific > > > > >>>>>>>>>>>>>>> errors. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to > > solve > > > > >>>>>>>> these > > > > >>>>>>>>>>>>> problems. I > > > > >>>>>>>>>>>>>>> agree though that we will need something more than > the > > > > >> error > > > > >>>>>>>>>> classes > > > > >>>>>>>>>>>> I'm > > > > >>>>>>>>>>>>>>> proposing if we want to have different handling be > > > > >>>>>>>> configurable. > > > > >>>>>>>>>>>>>>> My concern is that the open-endedness of a handler > > means > > > > >>>>>>> that > > > > >>>>>>>> we > > > > >>>>>>>>>> are > > > > >>>>>>>>>>>>>>> creating more problems than we are solving. It is > still > > > > >>>>>>>> unclear > > > > >>>>>>>>> to > > > > >>>>>>>>>>> me > > > > >>>>>>>>>>>>> how > > > > >>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could > > include > > > > >> an > > > > >>>>>>>>>> example? > > > > >>>>>>>>>>>> It > > > > >>>>>>>>>>>>>>> seems like there is a specific use case in mind and > > maybe > > > > >> we > > > > >>>>>>>> can > > > > >> >