I'm not sure I agree with the Retriable and NonRetriableResponse comment.
This doesn't limit the blast radius or enforce certain errors are used.
I think we might disagree on how controlled these interfaces can be...

Justine

On Mon, May 13, 2024 at 8:40 AM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Alieh,
>
> Thanks for the updates! I just have a few more thoughts:
>
> - I don't think a boolean property is sufficient to dictate retries for
> unknown topic partitions, though. These errors can occur if a topic has
> just been created, which can occur if, for example, automatic topic
> creation is enabled for a multi-task connector. This is why I proposed a
> timeout instead of a boolean (and see my previous email for why reducing
> max.block.ms for a producer is not a viable alternative). If it helps, one
> way to reproduce this yourself is to add the line
> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here:
>
> https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
> and then check the logs afterward for messages like "Error while fetching
> metadata with correlation id <n> : {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".
>
> - I also don't think we need custom XxxResponse enums for every possible
> method; it seems like this will lead to a lot of duplication and cognitive
> overhead if we want to expand the error handler in the future. Something
> more flexible like RetriableResponse and NonRetriableResponse could
> suffice.
>
> - Finally, the KIP still doesn't state how the handler will or won't take
> precedence over existing retry properties. If I set `retries` or `
> delivery.timeout.ms` or `max.block.ms` to low values, will that cause
> retries to cease even if my custom handler would otherwise keep returning
> RETRY for an error?
>
> Cheers,
>
> Chris
>
> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi Alieh,
> > Just a few more comments on the KIP. It is looking much less risky now
> the
> > scope
> > is tighter.
> >
> > [AJS1] It would be nice to have default implementations of the handle
> > methods
> > so an implementor would not need to implement both themselves.
> >
> > [AJS2] Producer configurations which are class names usually end in
> > “.class”.
> > I suggest “custom.exception.handler.class”.
> >
> > [AJS3] If I implemented a handler, and I set a non-default value for one
> > of the
> > new configuations, what happens? I would expect that the handler takes
> > precedence. I wasn’t quite clear what “the control will follow the
> handler
> > instructions” meant.
> >
> > [AJS4] Because you now have an enum for the
> > RecordTooLargeExceptionResponse,
> > I don’t think you need to state in the comment for
> > ProducerExceptionHandler that
> > RETRY will be interpreted as FAIL.
> >
> > Thanks,
> > Andrew
> >
> > > On 13 May 2024, at 14:53, Alieh Saeedi <asae...@confluent.io.INVALID>
> > wrote:
> > >
> > > Hi all,
> > >
> > >
> > > Thanks for the very interesting discussion during my PTO.
> > >
> > >
> > > KIP updates and addressing concerns:
> > >
> > >
> > > 1) Two handle() methods are defined in ProducerExceptionHandler for the
> > two
> > > exceptions with different input parameters so that we have
> > > handle(RecordTooLargeException e, ProducerRecord record) and
> > > handle(UnknownTopicOrPartitionException e, ProducerRecord record)
> > >
> > >
> > > 2) The ProducerExceptionHandler extends `Closable` as well.
> > >
> > >
> > > 3) The KIP suggests having two more configuration parameters with
> boolean
> > > values:
> > >
> > > - `drop.invalid.large.records` with a default value of `false` for
> > > swallowing too large records.
> > >
> > > - `retry.unknown.topic.partition` with a default value of `true` that
> > > performs RETRY for `max.block.ms` ms, encountering the
> > > UnknownTopicOrPartitionException.
> > >
> > >
> > > Hope the main concerns are addressed so that we can go forward with
> > voting.
> > >
> > >
> > > Cheers,
> > >
> > > Alieh
> > >
> > > On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > >> Hi Mathias,
> > >>
> > >>> [AL1] While I see the point, I would think having a different
> callback
> > >> for every exception might not really be elegant?
> > >>
> > >> I'm not sure how to assess the level of elegance of the proposal, but
> I
> > can
> > >> comment on the technical characteristics:
> > >>
> > >> 1. Having specific interfaces that codify the logic that is currently
> > >> prescribed in the comments reduce the chance of making a mistake.
> > >> Commments may get ignored, misuderstood or etc. but if the contract is
> > >> codified, the compilier will help to enforce the contract.
> > >> 2. Given that the logic is trickier than it seems (the
> record-too-large
> > is
> > >> an example that can easily confuse someone who's not intimately
> familiar
> > >> with the nuances of the batching logic), having a little more hoops to
> > jump
> > >> would give a greater chance that whoever tries to add a new cases
> pauses
> > >> and thinks a bit more.
> > >> 3. As Justine pointed out, having different method will be a forcing
> > >> function to go through a KIP rather than smuggle new cases through
> > >> implementation.
> > >> 4. Sort of a consequence of the previous 3 -- all those things reduce
> > the
> > >> chance of someone writing the code that works with 2 errors and then
> > when
> > >> more errors are added in the future will suddenly incorrectly ignore
> new
> > >> errors (the example I gave in the previous email).
> > >>
> > >>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as
> > >> business logic. If a user puts a bad filter condition in their KS app,
> > and
> > >> drops messages
> > >>
> > >> I agree that there is always a chance to get a bug and lose messages,
> > but
> > >> there are generally separation of concerns that has different risk
> > profile:
> > >> the filtering logic may be more rigorously tested and rarely changed
> > (say
> > >> an application developer does it), but setting the topics to produce
> > may be
> > >> done via configuration (e.g. a user of the application does it) and
> it's
> > >> generally an expectation that users would get an error when
> > configuration
> > >> is incorrect.
> > >>
> > >> What could be worse is that UnknownTopicOrPartitionException can be an
> > >> intermittent error, i.e. with a generally correct configuration, there
> > >> could be metadata propagation problem on the cluster and then a random
> > set
> > >> of records could get lost.
> > >>
> > >>> [AL3] Maybe I misunderstand what you are saying, but to me, checking
> > the
> > >> size of the record upfront is exactly what the KIP proposes? No?
> > >>
> > >> It achieves the same result but solves it differently, my proposal:
> > >>
> > >> 1. Application checks the validity of a record (maybe via a new
> > >> validateRecord method) before producing it, and can just exclude it or
> > >> return an error to the user.
> > >> 2. Application produces the record -- at this point there are no
> records
> > >> that could return record too large, they were either skipped at step 1
> > or
> > >> we didn't get here because step 1 failed.
> > >>
> > >> Vs. KIP's proposal
> > >>
> > >> 1. Application produces the record.
> > >> 2. Application gets a callback.
> > >> 3. Application returns the action on how to proceed.
> > >>
> > >> The advantage of the former is the clarity of semantics -- the record
> is
> > >> invalid (property of the record, not a function of server state or
> > server
> > >> configuration) and we can clearly know that it is the record that is
> bad
> > >> and can never succeed.
> > >>
> > >> The KIP-proposed way actually has a very tricky point: it actually
> > handles
> > >> a subset of record-too-large exceptions.  The broker can return
> > >> record-too-large and reject the whole batch (but we don't want to
> ignore
> > >> those because then we can skip random records that just happened to be
> > in
> > >> the same batch), in some sense we use the same error for 2 different
> > >> conditions and understanding that requires pretty deep understanding
> of
> > >> Kafka internals.
> > >>
> > >> -Artem
> > >>
> > >>
> > >> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
> > <jols...@confluent.io.invalid
> > >>>
> > >> wrote:
> > >>
> > >>> My concern with respect to it being fragile: the code that ensures
> the
> > >>> error type is internal to the producer. Someone may see it and say, I
> > >> want
> > >>> to add such and such error. This looks like internal code, so I don't
> > >> need
> > >>> a KIP, and then they can change it to whatever they want thinking it
> is
> > >>> within the typical kafka improvement protocol.
> > >>>
> > >>> Relying on an internal change to enforce an external API is fragile
> in
> > my
> > >>> opinion. That's why I sort of agreed with Artem with enforcing the
> > error
> > >> in
> > >>> the method signature -- part of the public API.
> > >>>
> > >>> Chris's comments on requiring more information to handler again makes
> > me
> > >>> wonder if we are solving a problem of lack of information at the
> > >>> application level with a more powerful solution than we need. (Ie, if
> > we
> > >>> had more information, could the application close and restart the
> > >>> transaction rather than having to drop records) But I am happy to
> > >>> compromise with a handler that we can agree is sufficiently
> controlled
> > >> and
> > >>> documented.
> > >>>
> > >>> Justine
> > >>>
> > >>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton <chr...@aiven.io.invalid
> >
> > >>> wrote:
> > >>>
> > >>>> Hi Alieh,
> > >>>>
> > >>>> Continuing prior discussions:
> > >>>>
> > >>>> 1) Regarding the "flexibility" discussion, my overarching point is
> > >> that I
> > >>>> don't see the point in allowing for this kind of pluggable logic
> > >> without
> > >>>> also covering more scenarios. Take example 2 in the KIP: if we're
> > going
> > >>> to
> > >>>> implement retries only on "important" topics when a topic partition
> > >> isn't
> > >>>> found, why wouldn't we also want to be able to do this for other
> > >> errors?
> > >>>> Again, taking authorization errors as an example, why wouldn't we
> want
> > >> to
> > >>>> be able to fail when we can't write to "important" topics because
> the
> > >>>> producer principal lacks sufficient ACLs, and drop the record if the
> > >>> topic
> > >>>> isn't "important"? In a security-conscious environment with
> > >>>> runtime-dependent topic routing (which is a common feature of many
> > >> source
> > >>>> connectors, such as the Debezium connectors), this seems fairly
> > likely.
> > >>>>
> > >>>> 2) As far as changing the shape of the API goes, I like Artem's idea
> > of
> > >>>> splitting out the interface based on specific exceptions. This may
> be
> > a
> > >>>> little laborious to expand in the future, but if we really want to
> > >>>> limit the exceptions that we cover with the handler and move slowly
> > and
> > >>>> cautiously, then IMO it'd be reasonable to reflect that in the
> > >>> interface. I
> > >>>> also acknowledge that there's no way to completely prevent people
> from
> > >>>> shooting themselves in the foot by implementing the API incorrectly,
> > >> but
> > >>> I
> > >>>> think it's worth it to do what we can--including leveraging the Java
> > >>>> language's type system--to help them, so IMO there's value to
> > >> eliminating
> > >>>> the implicit behavior of failing when a policy returns RETRY for a
> > >>>> non-retriable error. This can take a variety of shapes and I'm not
> > >> going
> > >>> to
> > >>>> insist on anything specific, but I do want to again raise my
> concerns
> > >>> with
> > >>>> the current proposal and request that we find something a little
> > >> better.
> > >>>>
> > >>>> 3) Concerning the default implementation--actually, I meant what I
> > >> wrote
> > >>> :)
> > >>>> I don't want a "second" default, I want an implementation of this
> > >>> interface
> > >>>> to be used as the default if no others are specified. The behavior
> of
> > >>> this
> > >>>> default implementation would be identical to existing behavior (so
> > >> there
> > >>>> would be no backwards compatibility concerns like the ones raised by
> > >>>> Matthias), but it would be possible to configure this default
> handler
> > >>> class
> > >>>> to behave differently for a basic set of scenarios. This would
> mirror
> > >>> (pun
> > >>>> intended) the approach we've taken with Mirror Maker 2 and its
> > >>>> ReplicationPolicy interface [1]. There is a default implementation
> > >>>> available [2] that recognizes a handful of basic configuration
> > >> properties
> > >>>> [3] for simple tweaks, but if users want, they can also implement
> > their
> > >>> own
> > >>>> replication policy for more fine-grained logic if those properties
> > >> aren't
> > >>>> flexible enough.
> > >>>>
> > >>>> More concretely, I'm imagining something like this for the producer
> > >>>> exception handler:
> > >>>>
> > >>>> - Default implementation class
> > >>>> of org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> > >>>> - This class would recognize two properties:
> > >>>>  - drop.invalid.large.records: Boolean property, defaults to false.
> If
> > >>>> "false", then causes the handler to return FAIL whenever
> > >>>> a RecordTooLargeException is encountered; if "true", then causes
> > >>>> SWALLOW/SKIP/DROP to be returned instead
> > >>>>  - unknown.topic.partition.retry.timeout.ms: Integer property,
> > >> defaults
> > >>>> to
> > >>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is
> encountered,
> > >>>> causes the handler to return FAIL if that record has been pending
> for
> > >>> more
> > >>>> than the retry timeout; otherwise, causes RETRY to be returned
> > >>>>
> > >>>> I think this is worth addressing now instead of later because it
> > forces
> > >>> us
> > >>>> to evaluate the usefulness of this interface and it addresses a
> > >>>> long-standing issue not just with Kafka Connect, but with the Java
> > >>> producer
> > >>>> in general. For reference, here are a few tickets I collected after
> > >>> briefly
> > >>>> skimming our Jira showing that this is a real pain point for users:
> > >>>> https://issues.apache.org/jira/browse/KAFKA-10340,
> > >>>> https://issues.apache.org/jira/browse/KAFKA-12990,
> > >>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although this is
> > >>>> frequently reported with Kafka Connect, it applies to anyone who
> > >>> configures
> > >>>> a producer to use a high retry timeout. I am aware of the
> > max.block.ms
> > >>>> property, but it's painful and IMO poor behavior to require users to
> > >>> reduce
> > >>>> the value of this property just to handle the single scenario when
> > >> trying
> > >>>> to write to topics that don't exist, since it would also limit the
> > >> retry
> > >>>> timeout for other operations that are legitimately retriable.
> > >>>>
> > >>>> Raising new points:
> > >>>>
> > >>>> 5) I don't see the interplay between this handler and existing
> > >>>> retry-related properties mentioned anywhere in the KIP. I'm assuming
> > >> that
> > >>>> properties like "retries", "max.block.ms", and "delivery.timeout.ms
> "
> > >>> would
> > >>>> take precedence over the handler and once they are exhausted, the
> > >>>> record/batch will fail no matter what? If so, it's probably worth
> > >> briefly
> > >>>> mentioning this (no more than a sentence or two) in the KIP, and if
> > >> not,
> > >>>> I'm curious what you have in mind.
> > >>>>
> > >>>> 6) I also wonder if the API provides enough information in its
> current
> > >>>> form. Would it be possible to provide handlers with some way of
> > >> tracking
> > >>>> how long a record has been pending for (i.e., how long it's been
> since
> > >>> the
> > >>>> record was provided as an argument to Producer::send)? One way to do
> > >> this
> > >>>> could be to add a method like `onNewRecord(ProducerRecord)` and
> > >>>> allow/require the handler to do its own bookkeeping, probably with a
> > >>>> matching `onRecordSuccess(ProducerRecord)` method so that the
> handler
> > >>>> doesn't eat up an ever-increasing amount of memory trying to track
> > >>> records.
> > >>>> An alternative could be to include information about the initial
> time
> > >> the
> > >>>> record was received by the producer and the number of retries that
> > have
> > >>>> been performed for it as parameters in the handle method(s), but I'm
> > >> not
> > >>>> sure how easy this would be to implement and it might clutter things
> > >> up a
> > >>>> bit too much.
> > >>>>
> > >>>> 7) A small request--can we add Closeable (or, if you prefer,
> > >>> AutoCloseable)
> > >>>> as a superinterface for the handler interface?
> > >>>>
> > >>>> [1] -
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> > >>>> [2] -
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> > >>>> [3] -
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> > >>>> ,
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
> > >>>>
> > >>>> Cheers,
> > >>>>
> > >>>> Chris
> > >>>>
> > >>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <mj...@apache.org>
> > >>> wrote:
> > >>>>
> > >>>>> Very interesting discussion.
> > >>>>>
> > >>>>> Seems a central point is the question about "how generic" we
> approach
> > >>>>> this, and some people think we need to be conservative and others
> > >> think
> > >>>>> we should try to be as generic as possible.
> > >>>>>
> > >>>>> Personally, I think following a limited scope for this KIP (by
> > >>>>> explicitly saying we only cover RecordTooLarge and
> > >>>>> UnknownTopicOrPartition) might be better. We have concrete tickets
> > >> that
> > >>>>> we address, while for other exception (like authorization) we don't
> > >>> know
> > >>>>> if people want to handle it to begin with. Boiling the ocean might
> > >> not
> > >>>>> get us too far, and being somewhat pragmatic might help to move
> this
> > >>> KIP
> > >>>>> forward. -- I also agree with Justin and Artem, that we want to be
> > >>>>> careful anyway to not allow users to break stuff too easily.
> > >>>>>
> > >>>>> As the same time, I agree that we should setup this change in a
> > >> forward
> > >>>>> looking way, and thus having a single generic handler allows us to
> > >>> later
> > >>>>> extend the handler more easily. This should also simplify follow up
> > >> KIP
> > >>>>> that might add new error cases (I actually mentioned one more to
> > >> Alieh
> > >>>>> already, but we both agreed that it might be best to exclude it
> from
> > >>> the
> > >>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP is
> not
> > >>>>> the end of the world.)
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> @Chris:
> > >>>>>
> > >>>>> (2) This sounds fair to me, but not sure how "bad" it actually
> would
> > >>> be?
> > >>>>> If the contract is clearly defined, it seems to be fine what the
> KIP
> > >>>>> proposes, and given that such a handler is an expert API, and we
> can
> > >>>>> provide "best practices" (cf my other comment below in [AL1]),
> being
> > >> a
> > >>>>> little bit pragmatic sound fine to me.
> > >>>>>
> > >>>>> Not sure if I understand Justin's argument on this question?
> > >>>>>
> > >>>>>
> > >>>>> (3) About having a default impl or not. I am fine with adding one,
> > >> even
> > >>>>> if I am not sure why Connect could not just add its own one and
> plug
> > >> it
> > >>>>> in (and we would add corresponding configs for Connect, but not for
> > >> the
> > >>>>> producer itself)? For this case, we could also do this as a follow
> up
> > >>>>> KIP, but happy to include it in this KIP to provide value to
> Connect
> > >>>>> right away (even if the value might not come right away if we miss
> > >> the
> > >>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we would for
> > >>> sure
> > >>>>> plugin our own impl, and lock down the config such that users
> cannot
> > >>> set
> > >>>>> their own handler on the internal producer to begin with. Might be
> > >> good
> > >>>>> to elaborate why the producer should have a default? We might
> > >> actually
> > >>>>> want to add this to the KIP right away?
> > >>>>>
> > >>>>> The key for a default impl would be, to not change the current
> > >>> behavior,
> > >>>>> and having no default seems to achieve this. For the two cases you
> > >>>>> mentioned, it's unclear to me what default value on "upper bound on
> > >>>>> retires" for UnkownTopicOrPartitionException we should set? Seems
> it
> > >>>>> would need to be the same as `delivery.timeout.ms`? However, if
> > >> users
> > >>>>> have `delivery.timeout.ms` actually overwritten we would need to
> set
> > >>>>> this config somewhat "dynamic"? Is this feasible? If we hard-code 2
> > >>>>> minutes, it might not be backward compatible. I have the impression
> > >> we
> > >>>>> might introduce some undesired coupling? -- For the "record too
> > >> large"
> > >>>>> case, the config seems to be boolean and setting it to `false` by
> > >>>>> default seems to provide backward compatibility.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> @Artem:
> > >>>>>
> > >>>>> [AL1] While I see the point, I would think having a different
> > >> callback
> > >>>>> for every exception might not really be elegant? In the end, the
> > >>> handler
> > >>>>> is an very advanced feature anyway, and if it's implemented in a
> bad
> > >>>>> way, well, it's a user error -- we cannot protect users from
> > >>> everything.
> > >>>>> To me, a handler like this, is to some extend "business logic" and
> > >> if a
> > >>>>> user gets business logic wrong, it's hard to protect them. -- We
> > >> would
> > >>>>> of course provide best practice guidance in the JaveDocs, and
> explain
> > >>>>> that a handler should have explicit `if` statements for stuff it
> want
> > >>> to
> > >>>>> handle, and only a single default which return FAIL.
> > >>>>>
> > >>>>>
> > >>>>> [AL2] Yes, but for KS we would retry at the application layer. Ie,
> > >> the
> > >>>>> TX is not completed, we clean up and setup out task from scratch,
> to
> > >>>>> ensure the pending transaction is completed before we resume. If
> the
> > >> TX
> > >>>>> was indeed aborted, we would retry from older offset and thus just
> > >> hit
> > >>>>> the same error again and the loop begins again.
> > >>>>>
> > >>>>>
> > >>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as
> > >>>>> business logic. If a user puts a bad filter condition in their KS
> > >> app,
> > >>>>> and drops messages, it nothing we can do about it, and this handler
> > >>>>> IMHO, has a similar purpose. This is also the line of thinking I
> > >> apply
> > >>>>> to EOS, to address Justin's concern about "should we allow to drop
> > >> for
> > >>>>> EOS", and my answer is "yes", because it's more business logic than
> > >>>>> actual error handling IMHO. And by default, we fail... So users
> > >> opt-in
> > >>>>> to add business logic to drop records. It's an application level
> > >>>>> decision how to write the code.
> > >>>>>
> > >>>>>
> > >>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> checking
> > >>> the
> > >>>>> size of the record upfront is exactly what the KIP proposes? No?
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> @Justin:
> > >>>>>
> > >>>>>> I saw the sample
> > >>>>>> code -- is it just an if statement checking for the error before
> > >> the
> > >>>>>> handler is invoked? That seems a bit fragile.
> > >>>>>
> > >>>>> What do you mean by fragile? Not sure if I see your point.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
> > >>>>>> Hi Alieh,
> > >>>>>>
> > >>>>>> Thanks for the KIP.  The motivation talks about very specific
> > >> cases,
> > >>>> but
> > >>>>>> the interface is generic.
> > >>>>>>
> > >>>>>> [AL1]
> > >>>>>> If the interface evolves in the future I think we could have the
> > >>>>> following
> > >>>>>> confusion:
> > >>>>>>
> > >>>>>> 1. A user implemented SWALLOW action for both
> > >> RecordTooLargeException
> > >>>> and
> > >>>>>> UnknownTopicOrPartitionException.  For simpicity they just return
> > >>>> SWALLOW
> > >>>>>> from the function, because it elegantly handles all known cases.
> > >>>>>> 2. The interface has evolved to support a new exception.
> > >>>>>> 3. The user has upgraded their Kafka client.
> > >>>>>>
> > >>>>>> Now a new kind of error gets dropped on the floor without user's
> > >>>>> intention
> > >>>>>> and it would be super hard to detect and debug.
> > >>>>>>
> > >>>>>> To avoid the confusion, I think we should use handlers for
> specific
> > >>>>>> exceptions.  Then if a new exception is added it won't get
> silently
> > >>>>> swalled
> > >>>>>> because the user would need to add new functionality to handle it.
> > >>>>>>
> > >>>>>> I also have some higher level comments:
> > >>>>>>
> > >>>>>> [AL2]
> > >>>>>>> it throws a TimeoutException, and the user can only blindly
> retry,
> > >>>> which
> > >>>>>> may result in an infinite retry loop
> > >>>>>>
> > >>>>>> If the TimeoutException happens during transactional processing
> > >>>> (exactly
> > >>>>>> once is the desired sematnics), then the client should not retry
> > >> when
> > >>>> it
> > >>>>>> gets TimeoutException because without knowing the reason for
> > >>>>>> TimeoutExceptions, the client cannot know whether the message got
> > >>>>> actually
> > >>>>>> produced or not and retrying the message may result in
> duplicatees.
> > >>>>>>
> > >>>>>>> The thrown TimeoutException "cuts" the connection to the
> > >> underlying
> > >>>> root
> > >>>>>> cause of missing metadata
> > >>>>>>
> > >>>>>> Maybe we should fix the error handling and return the proper
> > >>> underlying
> > >>>>>> message?  Then the application can properly handle the message
> > >> based
> > >>> on
> > >>>>>> preferences.
> > >>>>>>
> > >>>>>> From the product perspective, it's not clear how safe it is to
> > >>> blindly
> > >>>>>> ignore UnknownTopicOrPartitionException.  This could lead to
> > >>> situations
> > >>>>>> when a simple typo could lead to massive data loss (part of the
> > >> data
> > >>>>> would
> > >>>>>> effectively be produced to a "black hole" and the user may not
> > >> notice
> > >>>> it
> > >>>>>> for a while).
> > >>>>>>
> > >>>>>> In which situations would you recommend the user to "black hole"
> > >>>> messages
> > >>>>>> in case of misconfiguration?
> > >>>>>>
> > >>>>>> [AL3]
> > >>>>>>
> > >>>>>>> If the custom handler decides on SWALLOW for
> > >>> RecordTooLargeException,
> > >>>>>>
> > >>>>>> Is it my understanding that this KIP proposes that functionality
> > >> that
> > >>>>> would
> > >>>>>> only be able to SWALLOW RecordTooLargeException that happen
> because
> > >>> the
> > >>>>>> producer cannot produce the record (if the broker rejects the
> > >> batch,
> > >>>> the
> > >>>>>> error won't get to the handler, because we cannot know which other
> > >>>>> records
> > >>>>>> get ignored).  In this case, why not just check the locally
> > >>> configured
> > >>>>> max
> > >>>>>> record size upfront and not produce the recrord in the first
> place?
> > >>>>> Maybe
> > >>>>>> we can expose a validation function from the producer that could
> > >>>> validate
> > >>>>>> the records locally, so we don't need to produce the record in
> > >> order
> > >>> to
> > >>>>>> know that it's invalid.
> > >>>>>>
> > >>>>>> -Artem
> > >>>>>>
> > >>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> > >>>>> <jols...@confluent.io.invalid>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Alieh and Chris,
> > >>>>>>>
> > >>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I just
> > >>>> didn't
> > >>>>>>> understand how that would be ensured on the producer side. I saw
> > >> the
> > >>>>> sample
> > >>>>>>> code -- is it just an if statement checking for the error before
> > >> the
> > >>>>>>> handler is invoked? That seems a bit fragile.
> > >>>>>>>
> > >>>>>>> Can you clarify what you mean by `since the code does not reach
> > >> the
> > >>> KS
> > >>>>>>> interface and breaks somewhere in producer.` If we surfaced this
> > >>> error
> > >>>>> to
> > >>>>>>> the application in a better way would that also be a solution to
> > >> the
> > >>>>> issue?
> > >>>>>>>
> > >>>>>>> Justine
> > >>>>>>>
> > >>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> > >>>>> <asae...@confluent.io.invalid>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Thank you, Chris and Justine, for the feedback.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> @Chris
> > >>>>>>>>
> > >>>>>>>> 1) Flexibility: it has two meanings. The first meaning is the
> one
> > >>> you
> > >>>>>>>> mentioned. We are going to cover more exceptions in the future,
> > >> but
> > >>>> as
> > >>>>>>>> Justine mentioned, we must be very conservative about adding
> more
> > >>>>>>>> exceptions. Additionally, flexibility mainly means that the user
> > >> is
> > >>>>> able
> > >>>>>>> to
> > >>>>>>>> develop their own code. As mentioned in the motivation section
> > >> and
> > >>>> the
> > >>>>>>>> examples, sometimes the user decides on dropping a record based
> > >> on
> > >>>> the
> > >>>>>>>> topic, for example.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 2) Defining two separate methods for retriable and non-retriable
> > >>>>>>>> exceptions: although the idea is brilliant, the user may still
> > >>> make a
> > >>>>>>>> mistake by implementing the wrong method and see a non-expecting
> > >>>>>>> behaviour.
> > >>>>>>>> For example, he may implement handleRetriable() for
> > >>>>>>> RecordTooLargeException
> > >>>>>>>> and define SWALLOW for the exception, but in practice, he sees
> no
> > >>>>> change
> > >>>>>>> in
> > >>>>>>>> default behaviour since he implemented the wrong method. I think
> > >> we
> > >>>> can
> > >>>>>>>> never reduce the user’s mistakes to 0.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 3) Default implementation for Handler: the default behaviour is
> > >>>> already
> > >>>>>>>> preserved with NO need of implementing any handler or setting
> the
> > >>>>>>>> corresponding config parameter `custom.exception.handler`. What
> > >> you
> > >>>>> mean
> > >>>>>>> is
> > >>>>>>>> actually having a second default, which requires having both
> > >>>> interface
> > >>>>>>> and
> > >>>>>>>> config parameters. About UnknownTopicOrPartitionException: the
> > >>>> producer
> > >>>>>>>> already offers the config parameter `max.block.ms` which
> > >>> determines
> > >>>>> the
> > >>>>>>>> duration of retrying. The main purpose of the user who needs the
> > >>>>> handler
> > >>>>>>> is
> > >>>>>>>> to get the root cause of TimeoutException and handle it in the
> > >> way
> > >>> he
> > >>>>>>>> intends. The KIP explains the necessity of it for KS users.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the
> error,
> > >>>> while
> > >>>>>>>> SKIP means skip the record, I think. If it makes sense for more
> > >>> ppl,
> > >>>> I
> > >>>>>>> can
> > >>>>>>>> change it to SKIP
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> @Justine
> > >>>>>>>>
> > >>>>>>>> 1) was addressed by Chris.
> > >>>>>>>>
> > >>>>>>>> 2 and 3) The problem is exactly what you mentioned. Currently,
> > >>> there
> > >>>> is
> > >>>>>>> no
> > >>>>>>>> way to handle these issues application-side. Even KS users who
> > >>>>> implement
> > >>>>>>> KS
> > >>>>>>>> ProductionExceptionHandler are not able to handle the exceptions
> > >> as
> > >>>>> they
> > >>>>>>>> intend since the code does not reach the KS interface and breaks
> > >>>>>>> somewhere
> > >>>>>>>> in producer.
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Alieh
> > >>>>>>>>
> > >>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
> > >>>> fearthecel...@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Justine,
> > >>>>>>>>>
> > >>>>>>>>> The method signatures for the interface are indeed open-ended,
> > >> but
> > >>>> the
> > >>>>>>>> KIP
> > >>>>>>>>> states that its uses will be limited. See the motivation
> > >> section:
> > >>>>>>>>>
> > >>>>>>>>>> We believe that the user should be able to develop custom
> > >>> exception
> > >>>>>>>>> handlers for managing producer exceptions. On the other hand,
> > >> this
> > >>>>> will
> > >>>>>>>> be
> > >>>>>>>>> an expert-level API, and using that may result in strange
> > >>> behaviour
> > >>>> in
> > >>>>>>>> the
> > >>>>>>>>> system, making it hard to find the root cause. Therefore, the
> > >>> custom
> > >>>>>>>>> handler is currently limited to handling
> RecordTooLargeException
> > >>> and
> > >>>>>>>>> UnknownTopicOrPartitionException.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>>
> > >>>>>>>>> Chris
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
> > >>>>> <jols...@confluent.io.invalid
> > >>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>
> > >>>>>>>>>> I was out for KSB and then was also sick. :(
> > >>>>>>>>>>
> > >>>>>>>>>> To your point 1) Chris, I don't think it is limited to two
> > >>> specific
> > >>>>>>>>>> scenarios, since the interface accepts a generic Exception e
> > >> and
> > >>>> can
> > >>>>>>> be
> > >>>>>>>>>> implemented to check if that e is an instanceof any exception.
> > >> I
> > >>>>>>> didn't
> > >>>>>>>>> see
> > >>>>>>>>>> anywhere that specific errors are enforced. I'm a bit
> concerned
> > >>>> about
> > >>>>>>>>> this
> > >>>>>>>>>> actually. I'm concerned about the opened-endedness and the
> > >>> contract
> > >>>>>>> we
> > >>>>>>>>> have
> > >>>>>>>>>> with transactions. We are allowing the client to make
> decisions
> > >>>> that
> > >>>>>>>> are
> > >>>>>>>>>> somewhat invisible to the server. As an aside, can we build in
> > >>> log
> > >>>>>>>>> messages
> > >>>>>>>>>> when the handler decides to skip etc a message. I'm really
> > >>>> concerned
> > >>>>>>>>> about
> > >>>>>>>>>> messages being silently dropped.
> > >>>>>>>>>>
> > >>>>>>>>>> I do think Chris's point 2) about retriable vs non retriable
> > >>> errors
> > >>>>>>> is
> > >>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic or
> > >>>> partition
> > >>>>>>>>>> exception too early, as there are cases where it can be
> > >>> transient.
> > >>>>>>>>>>
> > >>>>>>>>>> I'm still a little bit wary of allowing dropping records as
> > >> part
> > >>> of
> > >>>>>>> EOS
> > >>>>>>>>>> generally as in many cases, these errors signify an issue with
> > >>> the
> > >>>>>>>>> original
> > >>>>>>>>>> data. I understand that streams and connect/mirror maker may
> > >> have
> > >>>>>>>> reasons
> > >>>>>>>>>> they want to progress past these messages, but wondering if
> > >> there
> > >>>> is
> > >>>>>>> a
> > >>>>>>>>> way
> > >>>>>>>>>> that can be done application-side. I'm willing to accept this
> > >>> sort
> > >>>> of
> > >>>>>>>>>> proposal if we can make it clear that this sort of thing is
> > >>>> happening
> > >>>>>>>> and
> > >>>>>>>>>> we limit the blast radius for what we can do.
> > >>>>>>>>>>
> > >>>>>>>>>> Justine
> > >>>>>>>>>>
> > >>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
> > >>>> <chr...@aiven.io.invalid
> > >>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Sorry for the delay, I've been out sick. I still have some
> > >>>> thoughts
> > >>>>>>>>> that
> > >>>>>>>>>>> I'd like to see addressed before voting.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 1) If flexibility is the motivation for a pluggable
> interface,
> > >>> why
> > >>>>>>>> are
> > >>>>>>>>> we
> > >>>>>>>>>>> only limiting the uses for this interface to two very
> specific
> > >>>>>>>>> scenarios?
> > >>>>>>>>>>> Why not also allow, e.g., authorization errors to be handled
> > >> as
> > >>>>>>> well
> > >>>>>>>>>>> (allowing users to drop records destined for some off-limits
> > >>>>>>> topics,
> > >>>>>>>> or
> > >>>>>>>>>>> retry for a limited duration in case there's a delay in the
> > >>>>>>>> propagation
> > >>>>>>>>>> of
> > >>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other
> > >> errors
> > >>>>>>> that
> > >>>>>>>>>> could
> > >>>>>>>>>>> be handled with this new API, both to avoid the follow-up
> work
> > >>> of
> > >>>>>>>>> another
> > >>>>>>>>>>> KIP to address them in the future, and to make sure that
> we're
> > >>> not
> > >>>>>>>>>> painting
> > >>>>>>>>>>> ourselves into a corner with the current API in a way that
> > >> would
> > >>>>>>> make
> > >>>>>>>>>>> future modifications difficult.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
> > >>> non-retriable
> > >>>>>>>>> errors
> > >>>>>>>>>>> are handled with the interface. Why not introduce two
> separate
> > >>>>>>>> methods
> > >>>>>>>>> to
> > >>>>>>>>>>> handle each case separately? That way there's no ambiguity or
> > >>>>>>>> implicit
> > >>>>>>>>>>> behavior when, e.g., attempting to retry on a
> > >>>>>>>> RecordTooLargeException.
> > >>>>>>>>>> This
> > >>>>>>>>>>> could be something like `NonRetriableResponse
> > >>>>>>> handle(ProducerRecord,
> > >>>>>>>>>>> Exception)` and `RetriableResponse
> > >>> handleRetriable(ProducerRecord,
> > >>>>>>>>>>> Exception)`, though the exact names and shape can obviously
> be
> > >>>>>>> toyed
> > >>>>>>>>>> with a
> > >>>>>>>>>>> bit.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 3) Although the flexibility of a pluggable interface may
> > >> benefit
> > >>>>>>> some
> > >>>>>>>>>>> users' custom producer applications and Kafka Streams
> > >>>> applications,
> > >>>>>>>> it
> > >>>>>>>>>>> comes at significant deployment cost for other low-/no-code
> > >>>>>>>>> environments,
> > >>>>>>>>>>> including but not limited to Kafka Connect and MirrorMaker 2.
> > >>> Can
> > >>>>>>> we
> > >>>>>>>>> add
> > >>>>>>>>>> a
> > >>>>>>>>>>> default implementation of the exception handler that allows
> > >> for
> > >>>>>>> some
> > >>>>>>>>>> simple
> > >>>>>>>>>>> behavior to be tweaked via configuration property? Two things
> > >>> that
> > >>>>>>>>> would
> > >>>>>>>>>> be
> > >>>>>>>>>>> nice to have would be A) an upper bound on the retry time for
> > >>>>>>>>>>> unknown-topic-partition exceptions and B) an option to drop
> > >>>> records
> > >>>>>>>>> that
> > >>>>>>>>>>> are large enough to trigger a record-too-large exception.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of the
> > >>>> proposed
> > >>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious,
> > >>> especially
> > >>>>>>>> when
> > >>>>>>>>>>> trying to guess the behavior for retriable errors.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Chris
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> > >>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> A summary of the KIP and the discussions:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The KIP introduces a handler interface for Producer in order
> > >> to
> > >>>>>>>>> handle
> > >>>>>>>>>>> two
> > >>>>>>>>>>>> exceptions: RecordTooLargeException and
> > >>>>>>>>>> UnknownTopicOrPartitionException.
> > >>>>>>>>>>>> The handler handles the exceptions per-record.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
> > >> sections]
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the producer
> > >>>>>>> collects
> > >>>>>>>>>>> multiple
> > >>>>>>>>>>>> records in batches. Then a RecordTooLargeException related
> > >> to a
> > >>>>>>>>> single
> > >>>>>>>>>>>> record leads to failing the entire batch. A custom exception
> > >>>>>>>> handler
> > >>>>>>>>> in
> > >>>>>>>>>>>> this case may decide on dropping the record and continuing
> > >> the
> > >>>>>>>>>>> processing.
> > >>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a
> > >> record
> > >>>>>>>> that
> > >>>>>>>>> is
> > >>>>>>>>>>> too
> > >>>>>>>>>>>> large is a poison pill record, and there is no way to skip
> > >> over
> > >>>>>>>> it. A
> > >>>>>>>>>>>> handler would allow us to react to this error inside the
> > >>>>>>> producer,
> > >>>>>>>>>> i.e.,
> > >>>>>>>>>>>> local to where the error happens, and thus simplify the
> > >> overall
> > >>>>>>>> code
> > >>>>>>>>>>>> significantly. Please read the Motivation section for more
> > >>>>>>>>> explanation.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the
> producer
> > >>>>>>>> handles
> > >>>>>>>>>>> this
> > >>>>>>>>>>>> exception internally and only issues a WARN log about
> missing
> > >>>>>>>>> metadata
> > >>>>>>>>>>> and
> > >>>>>>>>>>>> retries internally. Later, when the producer hits "
> > >>>>>>>>> deliver.timeout.ms"
> > >>>>>>>>>>> it
> > >>>>>>>>>>>> throws a TimeoutException, and the user can only blindly
> > >> retry,
> > >>>>>>>> which
> > >>>>>>>>>> may
> > >>>>>>>>>>>> result in an infinite retry loop. The thrown
> TimeoutException
> > >>>>>>>> "cuts"
> > >>>>>>>>>> the
> > >>>>>>>>>>>> connection to the underlying root cause of missing metadata
> > >>>>>>> (which
> > >>>>>>>>>> could
> > >>>>>>>>>>>> indeed be a transient error but is persistent for a
> > >>> non-existing
> > >>>>>>>>>> topic).
> > >>>>>>>>>>>> Thus, there is no programmatic way to break the infinite
> > >> retry
> > >>>>>>>> loop.
> > >>>>>>>>>>> Kafka
> > >>>>>>>>>>>> Streams also blindly retries for this case, and the
> > >> application
> > >>>>>>>> gets
> > >>>>>>>>>>> stuck.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> - Having interface vs configuration option: [Motivation,
> > >>>>>>> Examples,
> > >>>>>>>>> and
> > >>>>>>>>>>>> Rejected Alternatives sections]
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Our solution is introducing an interface due to the full
> > >>>>>>>> flexibility
> > >>>>>>>>>> that
> > >>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams ones,
> > >>>>>>>> determine
> > >>>>>>>>>> the
> > >>>>>>>>>>>> handler's behaviour based on the situation. For example, f
> > >>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may want
> > >> to
> > >>>>>>>> raise
> > >>>>>>>>> an
> > >>>>>>>>>>>> error for some topics but retry it for other topics. Having
> a
> > >>>>>>>>>>> configuration
> > >>>>>>>>>>>> option with a fixed set of possibilities does not serve the
> > >>>>>>> user's
> > >>>>>>>>>>>> needs. See Example 2, please.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces
> > >> section]
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> If the custom handler decides on SWALLOW for
> > >>>>>>>> RecordTooLargeException,
> > >>>>>>>>>>> then
> > >>>>>>>>>>>> this record will not be a part of the batch of transactions
> > >> and
> > >>>>>>>> will
> > >>>>>>>>>> also
> > >>>>>>>>>>>> not be sent to the broker in non-transactional mode. So no
> > >>>>>>> worries
> > >>>>>>>>>> about
> > >>>>>>>>>>>> getting a RecordTooLargeException from the broker in this
> > >> case,
> > >>>>>>> as
> > >>>>>>>>> the
> > >>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW means
> > >>> drop
> > >>>>>>>> the
> > >>>>>>>>>>> record
> > >>>>>>>>>>>> and continue/swallow the error.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> - What if the handle() method implements RETRY for
> > >>>>>>>>>>> RecordTooLargeException?
> > >>>>>>>>>>>> [Proposed Changes section]
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW for
> > >>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal to
> > >> FAIL.
> > >>>>>>>> This
> > >>>>>>>>> is
> > >>>>>>>>>>>> well documented/informed in javadoc.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> - What if the handle() method of the handler throws an
> > >>> exception?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The handler is expected to have correct code. If it throws
> an
> > >>>>>>>>>> exception,
> > >>>>>>>>>>>> everything fails.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> This is a PoC PR <
> https://github.com/apache/kafka/pull/15846
> > >>>
> > >>>>>>> ONLY
> > >>>>>>>>> for
> > >>>>>>>>>>>> RecordTooLargeException. The code changes related to
> > >>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this PR
> > >>> LATER.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Looking forward to your feedback again.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
> > >> k...@kirktrue.pro>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for the updates!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Comments inline...
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> > >>>>>>>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Addressing some of the main concerns:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by broker,
> > >>>>>>>> producer
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler`
> > >> interface
> > >>>>>>>> is
> > >>>>>>>>>>>>> introduced
> > >>>>>>>>>>>>>> to affect only the exceptions thrown from the producer.
> > >> This
> > >>>>>>>> KIP
> > >>>>>>>>>> very
> > >>>>>>>>>>>>>> specifically means to provide a possibility to manage the
> > >>>>>>>>>>>>>> `RecordTooLargeException` thrown from the Producer.send()
> > >>>>>>>> method.
> > >>>>>>>>>>>> Please
> > >>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
> > >>>>>>> investigated
> > >>>>>>>>> the
> > >>>>>>>>>>>> issue
> > >>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern about
> > >> how
> > >>>>>>>> we
> > >>>>>>>>>>> handle
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>> errors as well.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are
> called
> > >>>>>>>> when
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> record
> > >>>>>>>>>>>>>> sent to the server is acknowledged, while this is not the
> > >>>>>>>> desired
> > >>>>>>>>>>> time
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>> all exceptions. We intend to handle exceptions beforehand.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I guess it makes sense to keep the expectation for when
> > >>>>>>> Callback
> > >>>>>>>> is
> > >>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> - What if the custom handler returns RETRY for
> > >>>>>>>>>>>>> `RecordTooLargeException`? I
> > >>>>>>>>>>>>>> assume changing the producer configuration at runtime is
> > >>>>>>>>> possible.
> > >>>>>>>>>> If
> > >>>>>>>>>>>> so,
> > >>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in the
> > >>>>>>> next
> > >>>>>>>>>> try,
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> too large record is not poisoning any more. I am not 100%
> > >>>>>>> sure
> > >>>>>>>>>> about
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> technical details, though. Otherwise, we can consider the
> > >>>>>>> RETRY
> > >>>>>>>>> as
> > >>>>>>>>>>> FAIL
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>> this exception. Another solution would be to consider a
> > >>>>>>>> constant
> > >>>>>>>>>>> number
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>> times for RETRY which can be useful for other exceptions
> as
> > >>>>>>>> well.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> It’s not presently possible to change the configuration of
> > >> an
> > >>>>>>>>>> existing
> > >>>>>>>>>>>>> Producer at runtime. So if a record hits a
> > >>>>>>>> RecordTooLargeException
> > >>>>>>>>>>> once,
> > >>>>>>>>>>>> no
> > >>>>>>>>>>>>> amount of retrying (with the current Producer) will change
> > >>> that
> > >>>>>>>>> fact.
> > >>>>>>>>>>> So
> > >>>>>>>>>>>>> I’m still a little stuck on how to handle a response of
> > >> RETRY
> > >>>>>>> for
> > >>>>>>>>> an
> > >>>>>>>>>>>>> “oversized” record.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> - What if the handle() method itself throws an exception?
> I
> > >>>>>>>> think
> > >>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be
> exactly
> > >>>>>>>> like
> > >>>>>>>>>> when
> > >>>>>>>>>>>> no
> > >>>>>>>>>>>>>> custom handler is defined since the user actually did not
> > >>>>>>> have
> > >>>>>>>> a
> > >>>>>>>>>>>> working
> > >>>>>>>>>>>>>> handler.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is the
> > >> right
> > >>>>>>>>>> choice.
> > >>>>>>>>>>> It
> > >>>>>>>>>>>>> then becomes a silent failure that might have
> repercussions,
> > >>>>>>>>>> depending
> > >>>>>>>>>>> on
> > >>>>>>>>>>>>> the business logic. A user would have to proactively trawls
> > >>>>>>>> through
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> - Why not use config parameters instead of an interface?
> As
> > >>>>>>>>>> explained
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that the
> > >>>>>>> handler
> > >>>>>>>>>> will
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>> used for a greater number of exceptions in the future.
> > >>>>>>>> Defining a
> > >>>>>>>>>>>>>> configuration parameter for each exception may make the
> > >>>>>>>>>>> configuration a
> > >>>>>>>>>>>>> bit
> > >>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Agreed that the logic-via-configuration approach is weird
> > >> and
> > >>>>>>>>>> limiting.
> > >>>>>>>>>>>>> Forget I ever suggested it ;)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I’d think additional background in the Motivation section
> > >>> would
> > >>>>>>>>> help
> > >>>>>>>>>> me
> > >>>>>>>>>>>>> understand how users might use this feature beyond a)
> > >> skipping
> > >>>>>>>>>>>> “oversized”
> > >>>>>>>>>>>>> records, and b) not retrying missing topics.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Small change:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for
> brevity
> > >>>>>>> and
> > >>>>>>>>>>>>> simplicity.
> > >>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The name change sounds good to me.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks Alieh!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I thank you all again for your useful
> > >> questions/suggestions.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I would be happy to hear more of your concerns, as stated
> > >> in
> > >>>>>>>> some
> > >>>>>>>>>>>>> feedback.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> > >>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks Alieh for the updates.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I'm a little concerned about the design pattern here. It
> > >>>>>>> seems
> > >>>>>>>>>> like
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic
> > >>>>>>> handler.
> > >>>>>>>>>>>>>>> I think we tried to narrow down on the specific errors we
> > >>>>>>> want
> > >>>>>>>>> to
> > >>>>>>>>>>>>> handle,
> > >>>>>>>>>>>>>>> but it feels a little clunky as we have a generic thing
> > >> for
> > >>>>>>>> two
> > >>>>>>>>>>>> specific
> > >>>>>>>>>>>>>>> errors.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to solve
> > >>>>>>>> these
> > >>>>>>>>>>>>> problems. I
> > >>>>>>>>>>>>>>> agree though that we will need something more than the
> > >> error
> > >>>>>>>>>> classes
> > >>>>>>>>>>>> I'm
> > >>>>>>>>>>>>>>> proposing if we want to have different handling be
> > >>>>>>>> configurable.
> > >>>>>>>>>>>>>>> My concern is that the open-endedness of a handler means
> > >>>>>>> that
> > >>>>>>>> we
> > >>>>>>>>>> are
> > >>>>>>>>>>>>>>> creating more problems than we are solving. It is still
> > >>>>>>>> unclear
> > >>>>>>>>> to
> > >>>>>>>>>>> me
> > >>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could include
> > >> an
> > >>>>>>>>>> example?
> > >>>>>>>>>>>> It
> > >>>>>>>>>>>>>>> seems like there is a specific use case in mind and maybe
> > >> we
> > >>>>>>>> can
> > >>>>>>>>>>> make
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>> design that is tighter and supports that case.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <
> > >>>>>>> k...@kirktrue.pro>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the KIP!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> A few questions:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> K1. What is the expected behavior for the producer if it
> > >>>>>>>>>> generates
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> RecordTooLargeException, but the handler returns RETRY?
> > >>>>>>>>>>>>>>>> K2. How do we determine which Record was responsible for
> > >>>>>>> the
> > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException since we get that
> > >> response
> > >>>>>>>>> when
> > >>>>>>>>>>>>>>> sending  a
> > >>>>>>>>>>>>>>>> batch of records?
> > >>>>>>>>>>>>>>>> K3. What is the expected behavior if the handle() method
> > >>>>>>>> itself
> > >>>>>>>>>>>> throws
> > >>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>> error?
> > >>>>>>>>>>>>>>>> K4. What is the downside of adding an onError() method
> to
> > >>>>>>> the
> > >>>>>>>>>>>>> Producer’s
> > >>>>>>>>>>>>>>>> Callback interface vs. a new mechanism?
> > >>>>>>>>>>>>>>>> K5. Can we change “ProducerExceptionHandlerResponse" to
> > >>>>>>> just
> > >>>>>>>>>>>> “Response”
> > >>>>>>>>>>>>>>>> given that it’s an inner enum?
> > >>>>>>>>>>>>>>>> K6. Any recommendation for callback authors to handle
> > >>>>>>>> different
> > >>>>>>>>>>>>> behavior
> > >>>>>>>>>>>>>>>> for different topics?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I’ll echo what others have said, it would help me
> > >>>>>>> understand
> > >>>>>>>>> why
> > >>>>>>>>>> we
> > >>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>> another handler class if there were more examples in the
> > >>>>>>>>>> Motivation
> > >>>>>>>>>>>>>>>> section. As it stands now, I agree with Chris that the
> > >>>>>>> stated
> > >>>>>>>>>>> issues
> > >>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>> be solved by adding two new configuration options:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>    oversized.record.behavior=fail
> > >>>>>>>>>>>>>>>>    retry.on.unknown.topic.or.partition=true
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> What I’m not yet able to wrap my head around is: what
> > >>>>>>> exactly
> > >>>>>>>>>> would
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> logic in the handler be? I’m not very imaginative, so
> I’m
> > >>>>>>>>>> assuming
> > >>>>>>>>>>>>> they’d
> > >>>>>>>>>>>>>>>> mostly be if-this-then-that. However, if they’re more
> > >>>>>>>>>> complicated,
> > >>>>>>>>>>>> I’d
> > >>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>> other concerns.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>> Kirk
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
> > >>>>>>>>>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thank you all for the feedback!
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Addressing the main concern: The KIP is about giving
> the
> > >>>>>>>> user
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> ability
> > >>>>>>>>>>>>>>>>> to handle producer exceptions, but to be more
> > >> conservative
> > >>>>>>>> and
> > >>>>>>>>>>> avoid
> > >>>>>>>>>>>>>>>> future
> > >>>>>>>>>>>>>>>>> issues, we decided to be limited to a short list of
> > >>>>>>>>> exceptions.
> > >>>>>>>>>> I
> > >>>>>>>>>>>>>>>> included
> > >>>>>>>>>>>>>>>>> *RecordTooLargeExceptin* and
> > >>>>>>>>> *UnknownTopicOrPartitionException.
> > >>>>>>>>>>>> *Open
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> suggestion for adding some more ;-)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> KIP Updates:
> > >>>>>>>>>>>>>>>>> - clarified the way that the user should configure the
> > >>>>>>>>> Producer
> > >>>>>>>>>> to
> > >>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> custom handler. I think adding a producer config
> > >> property
> > >>>>>>> is
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>> cleanest
> > >>>>>>>>>>>>>>>>> one.
> > >>>>>>>>>>>>>>>>> - changed the *ClientExceptionHandler* to
> > >>>>>>>>>>> *ProducerExceptionHandler*
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>> closer to what we are changing.
> > >>>>>>>>>>>>>>>>> - added the ProducerRecord as the input parameter of
> the
> > >>>>>>>>>> handle()
> > >>>>>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>> well.
> > >>>>>>>>>>>>>>>>> - increased the response types to 3 to have fail and
> two
> > >>>>>>>> types
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>>> continue.
> > >>>>>>>>>>>>>>>>> - The default behaviour is having no custom handler,
> > >>>>>>> having
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>> corresponding config parameter set to null. Therefore,
> > >> the
> > >>>>>>>> KIP
> > >>>>>>>>>>>>> provides
> > >>>>>>>>>>>>>>>> no
> > >>>>>>>>>>>>>>>>> default implementation of the interface.
> > >>>>>>>>>>>>>>>>> - We follow the interface solution as described in the
> > >>>>>>>>>>>>>>>>> Rejected Alternetives section.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <
> > >>>>>>>>>> mj...@apache.org
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh! It addresses an important
> > >> case
> > >>>>>>>> for
> > >>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>> handling.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I agree that using this handler would be an expert
> API,
> > >>>>>>> as
> > >>>>>>>>>>>> mentioned
> > >>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>> a few people. But I don't think it would be a reason
> to
> > >>>>>>> not
> > >>>>>>>>> add
> > >>>>>>>>>>> it.
> > >>>>>>>>>>>>>>> It's
> > >>>>>>>>>>>>>>>>>> always a tricky tradeoff what to expose to users and
> to
> > >>>>>>>> avoid
> > >>>>>>>>>>> foot
> > >>>>>>>>>>>>>>> guns,
> > >>>>>>>>>>>>>>>>>> but we added similar handlers to Kafka Streams, and
> > >> have
> > >>>>>>>> good
> > >>>>>>>>>>>>>>> experience
> > >>>>>>>>>>>>>>>>>> with it. Hence, I understand, but don't share the
> > >> concern
> > >>>>>>>>>> raised.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I also agree that there is some responsibility by the
> > >>>>>>> user
> > >>>>>>>> to
> > >>>>>>>>>>>>>>> understand
> > >>>>>>>>>>>>>>>>>> how such a handler should be implemented to not drop
> > >> data
> > >>>>>>>> by
> > >>>>>>>>>>>>> accident.
> > >>>>>>>>>>>>>>>>>> But it seem unavoidable and acceptable.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> While I understand that a "simpler / reduced" API (eg
> > >> via
> > >>>>>>>>>>> configs)
> > >>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>> also work, I personally prefer a full handler. Configs
> > >>>>>>> have
> > >>>>>>>>> the
> > >>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>> issue that they could be miss-used potentially leading
> > >> to
> > >>>>>>>>>>>> incorrectly
> > >>>>>>>>>>>>>>>>>> dropped data, but at the same time are less flexible
> > >> (and
> > >>>>>>>>> thus
> > >>>>>>>>>>>> maybe
> > >>>>>>>>>>>>>>>>>> ever harder to use correctly...?). Base on my
> > >> experience,
> > >>>>>>>>> there
> > >>>>>>>>>>> is
> > >>>>>>>>>>>>>>> also
> > >>>>>>>>>>>>>>>>>> often weird corner case for which it make sense to
> also
> > >>>>>>>> drop
> > >>>>>>>>>>>> records
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>> other exceptions, and a full handler has the advantage
> > >> of
> > >>>>>>>>> full
> > >>>>>>>>>>>>>>>>>> flexibility and "absolute power!".
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> To be fair: I don't know the exact code paths of the
> > >>>>>>>> producer
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> details, so please keep me honest. But my
> understanding
> > >>>>>>> is,
> > >>>>>>>>>> that
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>> aims to allow users to react to internal exception,
> and
> > >>>>>>>>> decide
> > >>>>>>>>>> to
> > >>>>>>>>>>>>> keep
> > >>>>>>>>>>>>>>>>>> retrying internally, swallow the error and drop the
> > >>>>>>> record,
> > >>>>>>>>> or
> > >>>>>>>>>>>> raise
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> error?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Maybe the KIP would need to be a little bit more
> > >> precises
> > >>>>>>>>> what
> > >>>>>>>>>>>> error
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> want to cover -- I don't think this list must be
> > >>>>>>>> exhaustive,
> > >>>>>>>>> as
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>> always do follow up KIP to also apply the handler to
> > >>>>>>> other
> > >>>>>>>>>> errors
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> expand the scope of the handler. The KIP does mention
> > >>>>>>>>> examples,
> > >>>>>>>>>>> but
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>> might be good to explicitly state for what cases the
> > >>>>>>>> handler
> > >>>>>>>>>> gets
> > >>>>>>>>>>>>>>>> applied?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I am also not sure if CONTINUE and FAIL are enough
> > >>>>>>> options?
> > >>>>>>>>>> Don't
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> need three options? Or would `CONTINUE` have different
> > >>>>>>>>> meaning
> > >>>>>>>>>>>>>>> depending
> > >>>>>>>>>>>>>>>>>> on the type of error? Ie, for a retryable error
> > >>>>>>> `CONTINUE`
> > >>>>>>>>>> would
> > >>>>>>>>>>>> mean
> > >>>>>>>>>>>>>>>>>> keep retrying internally, but for a non-retryable
> error
> > >>>>>>>>>>> `CONTINUE`
> > >>>>>>>>>>>>>>> means
> > >>>>>>>>>>>>>>>>>> swallow the error and drop the record? This semantic
> > >>>>>>>> overload
> > >>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>> tricky to reason about by users, so it might better to
> > >>>>>>>> split
> > >>>>>>>>>>>>>>> `CONTINUE`
> > >>>>>>>>>>>>>>>>>> into two cases -> `RETRY` and `SWALLOW` (or some
> better
> > >>>>>>>>> names).
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Additionally, should we just ship a
> > >>>>>>>>>>> `DefaultClientExceptionHandler`
> > >>>>>>>>>>>>>>>>>> which would return `FAIL`, for backward compatibility.
> > >> Or
> > >>>>>>>>> don't
> > >>>>>>>>>>>> have
> > >>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>> default handler to begin with and allow it to be
> > >> `null`?
> > >>>>>>> I
> > >>>>>>>>>> don't
> > >>>>>>>>>>>> see
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> need for a specific `TransactionExceptionHandler`. To
> > >> me,
> > >>>>>>>> the
> > >>>>>>>>>>> goal
> > >>>>>>>>>>>>>>>>>> should be to not modify the default behavior at all,
> > >> but
> > >>>>>>> to
> > >>>>>>>>>> just
> > >>>>>>>>>>>>> allow
> > >>>>>>>>>>>>>>>>>> users to change the default behavior if there is a
> > >> need.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> What is missing on the KIP though it, how the handler
> > >> is
> > >>>>>>>>> passed
> > >>>>>>>>>>>> into
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> producer thought? Would we need a new config which
> > >> allows
> > >>>>>>>> to
> > >>>>>>>>>> set
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>> custom handler? And/or would we allow to pass in an
> > >>>>>>>> instance
> > >>>>>>>>>> via
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> constructor or add a new method to set a handler?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > >>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Exception handling in the Kafka producer and consumer
> > >> is
> > >>>>>>>>>> really
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>> ideal.
> > >>>>>>>>>>>>>>>>>>> It’s even harder working out what’s going on with the
> > >>>>>>>>>> consumer.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I’m a bit nervous about this KIP and I agree with
> > >> Chris
> > >>>>>>>> that
> > >>>>>>>>>> it
> > >>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>> with additional
> > >>>>>>>>>>>>>>>>>>> motivation. This would be an expert-level interface
> > >>>>>>> given
> > >>>>>>>>> how
> > >>>>>>>>>>>>>>>> complicated
> > >>>>>>>>>>>>>>>>>>> the exception handling for Kafka has become.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 7. The application is not really aware of the
> batching
> > >>>>>>>> being
> > >>>>>>>>>>> done
> > >>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>> its
> > >>>>>>>>>>>>>>>>>> behalf.
> > >>>>>>>>>>>>>>>>>>> The ProduceResponse can actually return an array of
> > >>>>>>>> records
> > >>>>>>>>>>> which
> > >>>>>>>>>>>>>>>> failed
> > >>>>>>>>>>>>>>>>>>> per batch. If you get RecordTooLargeException, and
> > >> want
> > >>>>>>> to
> > >>>>>>>>>>> retry,
> > >>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>> probably
> > >>>>>>>>>>>>>>>>>>> need to remove the offending records from the batch
> > >> and
> > >>>>>>>>> retry
> > >>>>>>>>>>> it.
> > >>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>> is getting fiddly.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 8. There is already o.a.k.clients.producer.Callback.
> I
> > >>>>>>>>> wonder
> > >>>>>>>>>>>>> whether
> > >>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>> alternative might be to add a method to the existing
> > >>>>>>>>> Callback
> > >>>>>>>>>>>>>>>> interface,
> > >>>>>>>>>>>>>>>>>> such as:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>  ClientExceptionResponse onException(Exception
> > >>>>>>> exception)
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> It would be called when a ProduceResponse contains an
> > >>>>>>>> error,
> > >>>>>>>>>> but
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> producer is going to retry. It tells the producer
> > >>>>>>> whether
> > >>>>>>>> to
> > >>>>>>>>>> go
> > >>>>>>>>>>>>> ahead
> > >>>>>>>>>>>>>>>>>> with the retry
> > >>>>>>>>>>>>>>>>>>> or not. The default implementation would be to
> > >> CONTINUE,
> > >>>>>>>>>> because
> > >>>>>>>>>>>>>>> that’s
> > >>>>>>>>>>>>>>>>>>> just continuing to retry as planned. Note that this
> > >> is a
> > >>>>>>>>>>>> per-record
> > >>>>>>>>>>>>>>>>>> callback, so
> > >>>>>>>>>>>>>>>>>>> the application would be able to understand which
> > >>>>>>> records
> > >>>>>>>>>>> failed.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> By using an existing interface, we already know how
> to
> > >>>>>>>>>> configure
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> we know
> > >>>>>>>>>>>>>>>>>>> about the threading model for calling it.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>> Andrew
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton
> > >>>>>>>>>>> <chr...@aiven.io.INVALID
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks for the KIP! The issue with writing to
> > >>>>>>>> non-existent
> > >>>>>>>>>>> topics
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> particularly frustrating for users of Kafka Connect
> > >> and
> > >>>>>>>> has
> > >>>>>>>>>>> been
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> source
> > >>>>>>>>>>>>>>>>>>>> of a handful of Jira tickets over the years. My
> > >>>>>>> thoughts:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 1. An additional detail we can add to the motivation
> > >>>>>>> (or
> > >>>>>>>>>>> possibly
> > >>>>>>>>>>>>>>>>>> rejected
> > >>>>>>>>>>>>>>>>>>>> alternatives) section is that this kind of custom
> > >> retry
> > >>>>>>>>> logic
> > >>>>>>>>>>>> can't
> > >>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> implemented by hand by, e.g., setting retries to 0
> in
> > >>>>>>> the
> > >>>>>>>>>>>> producer
> > >>>>>>>>>>>>>>>>>> config
> > >>>>>>>>>>>>>>>>>>>> and handling exceptions at the application level. Or
> > >>>>>>>>> rather,
> > >>>>>>>>>> it
> > >>>>>>>>>>>>> can,
> > >>>>>>>>>>>>>>>>>> but 1)
> > >>>>>>>>>>>>>>>>>>>> it's a bit painful to have to reimplement at every
> > >>>>>>>>> call-site
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> Producer::send (and any code that awaits the
> returned
> > >>>>>>>>> Future)
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>> 2)
> > >>>>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>> impossible to do this without losing idempotency on
> > >>>>>>>>> retries.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 2. That said, I wonder if a pluggable interface is
> > >>>>>>> really
> > >>>>>>>>> the
> > >>>>>>>>>>>> right
> > >>>>>>>>>>>>>>>> call
> > >>>>>>>>>>>>>>>>>>>> here. Documenting the interactions of a producer
> with
> > >>>>>>>>>>>>>>>>>>>> a ClientExceptionHandler instance will be tricky,
> and
> > >>>>>>>>>>>> implementing
> > >>>>>>>>>>>>>>>> them
> > >>>>>>>>>>>>>>>>>>>> will also be a fair amount of work. I believe that
> > >>>>>>> there
> > >>>>>>>>>> needs
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>> more granularity for how writes to non-existent
> > >> topics
> > >>>>>>>> (or
> > >>>>>>>>>>>> really,
> > >>>>>>>>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from
> > >> the
> > >>>>>>>>>> broker)
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>> handled,
> > >>>>>>>>>>>>>>>>>>>> but I'm torn between keeping it simple with maybe
> one
> > >>>>>>> or
> > >>>>>>>>> two
> > >>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>> producer
> > >>>>>>>>>>>>>>>>>>>> config properties, or a full-blown pluggable
> > >> interface.
> > >>>>>>>> If
> > >>>>>>>>>>> there
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> cases that would benefit from a pluggable interface,
> > >> it
> > >>>>>>>>> would
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>>> nice
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> identify these and add them to the KIP to strengthen
> > >>>>>>> the
> > >>>>>>>>>>>>> motivation.
> > >>>>>>>>>>>>>>>>>> Right
> > >>>>>>>>>>>>>>>>>>>> now, I'm not sure the two that are mentioned in the
> > >>>>>>>>>> motivation
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>> sufficient.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 3. Alternatively, a possible compromise is for this
> > >> KIP
> > >>>>>>>> to
> > >>>>>>>>>>>>> introduce
> > >>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>> properties that dictate how to handle
> > >>>>>>>>> unknown-topic-partition
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> record-too-large errors, with the thinking that if
> we
> > >>>>>>>>>>> introduce a
> > >>>>>>>>>>>>>>>>>> pluggable
> > >>>>>>>>>>>>>>>>>>>> interface later on, these properties will be
> > >> recognized
> > >>>>>>>> by
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>> implementation of that interface but could be
> > >>>>>>> completely
> > >>>>>>>>>>> ignored
> > >>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>> replaced by alternative implementations.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 4. (Nit) You can remove the "This page is meant as a
> > >>>>>>>>> template
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>> writing a
> > >>>>>>>>>>>>>>>>>>>> KIP..." part from the KIP. It's not a template
> > >> anymore
> > >>>>>>> :)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 5. If we do go the pluggable interface route,
> > >> wouldn't
> > >>>>>>> we
> > >>>>>>>>>> want
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>> add
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> possibility for retry logic? The simplest version of
> > >>>>>>> this
> > >>>>>>>>>> could
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> add a
> > >>>>>>>>>>>>>>>>>>>> RETRY value to the ClientExceptionHandlerResponse
> > >> enum.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead
> > >> of
> > >>>>>>>>>>>> "CONTINUE"
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> the ClientExceptionHandlerResponse enum, since they
> > >>>>>>> cause
> > >>>>>>>>>>> records
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> dropped.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> > >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I echo what Omnia says, I'm not sure I understand
> > >> the
> > >>>>>>>>>>>> implications
> > >>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> change and I think more detail is needed.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> This comment also confused me a bit:
> > >>>>>>>>>>>>>>>>>>>>> * {@code ClientExceptionHandler} that continues the
> > >>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>> if a
> > >>>>>>>>>>>>>>>>>>>>> record is too large.
> > >>>>>>>>>>>>>>>>>>>>> * Otherwise, it makes the transaction to fail.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Relatedly, I've been working with some folks on a
> > >> KIP
> > >>>>>>>> for
> > >>>>>>>>>>>>>>>> transactions
> > >>>>>>>>>>>>>>>>>>>>> errors and how they are handled. Specifically for
> > >> the
> > >>>>>>>>>>>>>>>>>>>>> RecordTooLargeException (and a few other errors),
> we
> > >>>>>>>> want
> > >>>>>>>>> to
> > >>>>>>>>>>>> give
> > >>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>> error category for this error that allows the
> > >>>>>>>> application
> > >>>>>>>>> to
> > >>>>>>>>>>>>> choose
> > >>>>>>>>>>>>>>>>>> how it
> > >>>>>>>>>>>>>>>>>>>>> is handled. Maybe this KIP is something that you
> are
> > >>>>>>>>> looking
> > >>>>>>>>>>>> for?
> > >>>>>>>>>>>>>>>> Stay
> > >>>>>>>>>>>>>>>>>>>>> tuned :)
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> > >>>>>>>>>>>>>>>> o.g.h.ibra...@gmail.com
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I have couple of comments
> > >>>>>>>>>>>>>>>>>>>>>> - You mentioned in the KIP motivation,
> > >>>>>>>>>>>>>>>>>>>>>>> Another example for which a production exception
> > >>>>>>>> handler
> > >>>>>>>>>>> could
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>> useful
> > >>>>>>>>>>>>>>>>>>>>>> is if a user tries to write into a non-existing
> > >>>>>>> topic,
> > >>>>>>>>>> which
> > >>>>>>>>>>>>>>>> returns a
> > >>>>>>>>>>>>>>>>>>>>>> retryable error code; with infinite retries, the
> > >>>>>>>> producer
> > >>>>>>>>>>> would
> > >>>>>>>>>>>>>>> hang
> > >>>>>>>>>>>>>>>>>>>>>> retrying forever. A handler could help to break
> the
> > >>>>>>>>>> infinite
> > >>>>>>>>>>>>> retry
> > >>>>>>>>>>>>>>>>>> loop.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> How the handler can differentiate between
> something
> > >>>>>>>> that
> > >>>>>>>>> is
> > >>>>>>>>>>>>>>>> temporary
> > >>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> it should keep retrying and something permanent
> > >> like
> > >>>>>>>>> forgot
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> create
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> topic? temporary here could be
> > >>>>>>>>>>>>>>>>>>>>>> the producer get deployed before the topic
> creation
> > >>>>>>>>> finish
> > >>>>>>>>>>>>>>>> (specially
> > >>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>> the topic creation is handled via IaC)
> > >>>>>>>>>>>>>>>>>>>>>> temporary offline partitions
> > >>>>>>>>>>>>>>>>>>>>>> leadership changing
> > >>>>>>>>>>>>>>>>>>>>>>       Isn’t this putting the producer at risk of
> > >>>>>>>> dropping
> > >>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>>>>> unintentionally?
> > >>>>>>>>>>>>>>>>>>>>>> - Can you elaborate more on what is written in the
> > >>>>>>>>>>>> compatibility
> > >>>>>>>>>>>>> /
> > >>>>>>>>>>>>>>>>>>>>>> migration plan section please by explaining in bit
> > >>>>>>> more
> > >>>>>>>>>>> details
> > >>>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> changing behaviour and how this will impact client
> > >>>>>>> who
> > >>>>>>>>> are
> > >>>>>>>>>>>>>>>> upgrading?
> > >>>>>>>>>>>>>>>>>>>>>> - In the proposal changes can you elaborate in the
> > >>>>>>> KIP
> > >>>>>>>>>> where
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> producer lifecycle will ClientExceptionHandler and
> > >>>>>>>>>>>>>>>>>>>>>> TransactionExceptionHandler get triggered, and how
> > >>>>>>> will
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>> producer
> > >>>>>>>>>>>>>>>>>>>>>> configure them to point to costumed
> implementation.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>>>>>>>>>> Omnia
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> > >>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to
> > >>>>>>>>>> Producer.
> > >>>>>>>>>>>>>>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I look forward to your feedback!
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>> Alieh
> >
> >
> >
>

Reply via email to