Hi Alieh,
Just a few more comments on the KIP. It is looking much less risky now the scope
is tighter.

[AJS1] It would be nice to have default implementations of the handle methods
so an implementor would not need to implement both themselves.

[AJS2] Producer configurations which are class names usually end in “.class”.
I suggest “custom.exception.handler.class”.

[AJS3] If I implemented a handler, and I set a non-default value for one of the
new configuations, what happens? I would expect that the handler takes
precedence. I wasn’t quite clear what “the control will follow the handler
instructions” meant.

[AJS4] Because you now have an enum for the RecordTooLargeExceptionResponse,
I don’t think you need to state in the comment for ProducerExceptionHandler that
RETRY will be interpreted as FAIL.

Thanks,
Andrew

> On 13 May 2024, at 14:53, Alieh Saeedi <asae...@confluent.io.INVALID> wrote:
>
> Hi all,
>
>
> Thanks for the very interesting discussion during my PTO.
>
>
> KIP updates and addressing concerns:
>
>
> 1) Two handle() methods are defined in ProducerExceptionHandler for the two
> exceptions with different input parameters so that we have
> handle(RecordTooLargeException e, ProducerRecord record) and
> handle(UnknownTopicOrPartitionException e, ProducerRecord record)
>
>
> 2) The ProducerExceptionHandler extends `Closable` as well.
>
>
> 3) The KIP suggests having two more configuration parameters with boolean
> values:
>
> - `drop.invalid.large.records` with a default value of `false` for
> swallowing too large records.
>
> - `retry.unknown.topic.partition` with a default value of `true` that
> performs RETRY for `max.block.ms` ms, encountering the
> UnknownTopicOrPartitionException.
>
>
> Hope the main concerns are addressed so that we can go forward with voting.
>
>
> Cheers,
>
> Alieh
>
> On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
>> Hi Mathias,
>>
>>> [AL1] While I see the point, I would think having a different callback
>> for every exception might not really be elegant?
>>
>> I'm not sure how to assess the level of elegance of the proposal, but I can
>> comment on the technical characteristics:
>>
>> 1. Having specific interfaces that codify the logic that is currently
>> prescribed in the comments reduce the chance of making a mistake.
>> Commments may get ignored, misuderstood or etc. but if the contract is
>> codified, the compilier will help to enforce the contract.
>> 2. Given that the logic is trickier than it seems (the record-too-large is
>> an example that can easily confuse someone who's not intimately familiar
>> with the nuances of the batching logic), having a little more hoops to jump
>> would give a greater chance that whoever tries to add a new cases pauses
>> and thinks a bit more.
>> 3. As Justine pointed out, having different method will be a forcing
>> function to go through a KIP rather than smuggle new cases through
>> implementation.
>> 4. Sort of a consequence of the previous 3 -- all those things reduce the
>> chance of someone writing the code that works with 2 errors and then when
>> more errors are added in the future will suddenly incorrectly ignore new
>> errors (the example I gave in the previous email).
>>
>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as
>> business logic. If a user puts a bad filter condition in their KS app, and
>> drops messages
>>
>> I agree that there is always a chance to get a bug and lose messages, but
>> there are generally separation of concerns that has different risk profile:
>> the filtering logic may be more rigorously tested and rarely changed (say
>> an application developer does it), but setting the topics to produce may be
>> done via configuration (e.g. a user of the application does it) and it's
>> generally an expectation that users would get an error when configuration
>> is incorrect.
>>
>> What could be worse is that UnknownTopicOrPartitionException can be an
>> intermittent error, i.e. with a generally correct configuration, there
>> could be metadata propagation problem on the cluster and then a random set
>> of records could get lost.
>>
>>> [AL3] Maybe I misunderstand what you are saying, but to me, checking the
>> size of the record upfront is exactly what the KIP proposes? No?
>>
>> It achieves the same result but solves it differently, my proposal:
>>
>> 1. Application checks the validity of a record (maybe via a new
>> validateRecord method) before producing it, and can just exclude it or
>> return an error to the user.
>> 2. Application produces the record -- at this point there are no records
>> that could return record too large, they were either skipped at step 1 or
>> we didn't get here because step 1 failed.
>>
>> Vs. KIP's proposal
>>
>> 1. Application produces the record.
>> 2. Application gets a callback.
>> 3. Application returns the action on how to proceed.
>>
>> The advantage of the former is the clarity of semantics -- the record is
>> invalid (property of the record, not a function of server state or server
>> configuration) and we can clearly know that it is the record that is bad
>> and can never succeed.
>>
>> The KIP-proposed way actually has a very tricky point: it actually handles
>> a subset of record-too-large exceptions.  The broker can return
>> record-too-large and reject the whole batch (but we don't want to ignore
>> those because then we can skip random records that just happened to be in
>> the same batch), in some sense we use the same error for 2 different
>> conditions and understanding that requires pretty deep understanding of
>> Kafka internals.
>>
>> -Artem
>>
>>
>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan <jols...@confluent.io.invalid
>>>
>> wrote:
>>
>>> My concern with respect to it being fragile: the code that ensures the
>>> error type is internal to the producer. Someone may see it and say, I
>> want
>>> to add such and such error. This looks like internal code, so I don't
>> need
>>> a KIP, and then they can change it to whatever they want thinking it is
>>> within the typical kafka improvement protocol.
>>>
>>> Relying on an internal change to enforce an external API is fragile in my
>>> opinion. That's why I sort of agreed with Artem with enforcing the error
>> in
>>> the method signature -- part of the public API.
>>>
>>> Chris's comments on requiring more information to handler again makes me
>>> wonder if we are solving a problem of lack of information at the
>>> application level with a more powerful solution than we need. (Ie, if we
>>> had more information, could the application close and restart the
>>> transaction rather than having to drop records) But I am happy to
>>> compromise with a handler that we can agree is sufficiently controlled
>> and
>>> documented.
>>>
>>> Justine
>>>
>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton <chr...@aiven.io.invalid>
>>> wrote:
>>>
>>>> Hi Alieh,
>>>>
>>>> Continuing prior discussions:
>>>>
>>>> 1) Regarding the "flexibility" discussion, my overarching point is
>> that I
>>>> don't see the point in allowing for this kind of pluggable logic
>> without
>>>> also covering more scenarios. Take example 2 in the KIP: if we're going
>>> to
>>>> implement retries only on "important" topics when a topic partition
>> isn't
>>>> found, why wouldn't we also want to be able to do this for other
>> errors?
>>>> Again, taking authorization errors as an example, why wouldn't we want
>> to
>>>> be able to fail when we can't write to "important" topics because the
>>>> producer principal lacks sufficient ACLs, and drop the record if the
>>> topic
>>>> isn't "important"? In a security-conscious environment with
>>>> runtime-dependent topic routing (which is a common feature of many
>> source
>>>> connectors, such as the Debezium connectors), this seems fairly likely.
>>>>
>>>> 2) As far as changing the shape of the API goes, I like Artem's idea of
>>>> splitting out the interface based on specific exceptions. This may be a
>>>> little laborious to expand in the future, but if we really want to
>>>> limit the exceptions that we cover with the handler and move slowly and
>>>> cautiously, then IMO it'd be reasonable to reflect that in the
>>> interface. I
>>>> also acknowledge that there's no way to completely prevent people from
>>>> shooting themselves in the foot by implementing the API incorrectly,
>> but
>>> I
>>>> think it's worth it to do what we can--including leveraging the Java
>>>> language's type system--to help them, so IMO there's value to
>> eliminating
>>>> the implicit behavior of failing when a policy returns RETRY for a
>>>> non-retriable error. This can take a variety of shapes and I'm not
>> going
>>> to
>>>> insist on anything specific, but I do want to again raise my concerns
>>> with
>>>> the current proposal and request that we find something a little
>> better.
>>>>
>>>> 3) Concerning the default implementation--actually, I meant what I
>> wrote
>>> :)
>>>> I don't want a "second" default, I want an implementation of this
>>> interface
>>>> to be used as the default if no others are specified. The behavior of
>>> this
>>>> default implementation would be identical to existing behavior (so
>> there
>>>> would be no backwards compatibility concerns like the ones raised by
>>>> Matthias), but it would be possible to configure this default handler
>>> class
>>>> to behave differently for a basic set of scenarios. This would mirror
>>> (pun
>>>> intended) the approach we've taken with Mirror Maker 2 and its
>>>> ReplicationPolicy interface [1]. There is a default implementation
>>>> available [2] that recognizes a handful of basic configuration
>> properties
>>>> [3] for simple tweaks, but if users want, they can also implement their
>>> own
>>>> replication policy for more fine-grained logic if those properties
>> aren't
>>>> flexible enough.
>>>>
>>>> More concretely, I'm imagining something like this for the producer
>>>> exception handler:
>>>>
>>>> - Default implementation class
>>>> of org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
>>>> - This class would recognize two properties:
>>>>  - drop.invalid.large.records: Boolean property, defaults to false. If
>>>> "false", then causes the handler to return FAIL whenever
>>>> a RecordTooLargeException is encountered; if "true", then causes
>>>> SWALLOW/SKIP/DROP to be returned instead
>>>>  - unknown.topic.partition.retry.timeout.ms: Integer property,
>> defaults
>>>> to
>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is encountered,
>>>> causes the handler to return FAIL if that record has been pending for
>>> more
>>>> than the retry timeout; otherwise, causes RETRY to be returned
>>>>
>>>> I think this is worth addressing now instead of later because it forces
>>> us
>>>> to evaluate the usefulness of this interface and it addresses a
>>>> long-standing issue not just with Kafka Connect, but with the Java
>>> producer
>>>> in general. For reference, here are a few tickets I collected after
>>> briefly
>>>> skimming our Jira showing that this is a real pain point for users:
>>>> https://issues.apache.org/jira/browse/KAFKA-10340,
>>>> https://issues.apache.org/jira/browse/KAFKA-12990,
>>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although this is
>>>> frequently reported with Kafka Connect, it applies to anyone who
>>> configures
>>>> a producer to use a high retry timeout. I am aware of the max.block.ms
>>>> property, but it's painful and IMO poor behavior to require users to
>>> reduce
>>>> the value of this property just to handle the single scenario when
>> trying
>>>> to write to topics that don't exist, since it would also limit the
>> retry
>>>> timeout for other operations that are legitimately retriable.
>>>>
>>>> Raising new points:
>>>>
>>>> 5) I don't see the interplay between this handler and existing
>>>> retry-related properties mentioned anywhere in the KIP. I'm assuming
>> that
>>>> properties like "retries", "max.block.ms", and "delivery.timeout.ms"
>>> would
>>>> take precedence over the handler and once they are exhausted, the
>>>> record/batch will fail no matter what? If so, it's probably worth
>> briefly
>>>> mentioning this (no more than a sentence or two) in the KIP, and if
>> not,
>>>> I'm curious what you have in mind.
>>>>
>>>> 6) I also wonder if the API provides enough information in its current
>>>> form. Would it be possible to provide handlers with some way of
>> tracking
>>>> how long a record has been pending for (i.e., how long it's been since
>>> the
>>>> record was provided as an argument to Producer::send)? One way to do
>> this
>>>> could be to add a method like `onNewRecord(ProducerRecord)` and
>>>> allow/require the handler to do its own bookkeeping, probably with a
>>>> matching `onRecordSuccess(ProducerRecord)` method so that the handler
>>>> doesn't eat up an ever-increasing amount of memory trying to track
>>> records.
>>>> An alternative could be to include information about the initial time
>> the
>>>> record was received by the producer and the number of retries that have
>>>> been performed for it as parameters in the handle method(s), but I'm
>> not
>>>> sure how easy this would be to implement and it might clutter things
>> up a
>>>> bit too much.
>>>>
>>>> 7) A small request--can we add Closeable (or, if you prefer,
>>> AutoCloseable)
>>>> as a superinterface for the handler interface?
>>>>
>>>> [1] -
>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
>>>> [2] -
>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
>>>> [3] -
>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
>>>> ,
>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
>>>>
>>>> Cheers,
>>>>
>>>> Chris
>>>>
>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>>
>>>>> Very interesting discussion.
>>>>>
>>>>> Seems a central point is the question about "how generic" we approach
>>>>> this, and some people think we need to be conservative and others
>> think
>>>>> we should try to be as generic as possible.
>>>>>
>>>>> Personally, I think following a limited scope for this KIP (by
>>>>> explicitly saying we only cover RecordTooLarge and
>>>>> UnknownTopicOrPartition) might be better. We have concrete tickets
>> that
>>>>> we address, while for other exception (like authorization) we don't
>>> know
>>>>> if people want to handle it to begin with. Boiling the ocean might
>> not
>>>>> get us too far, and being somewhat pragmatic might help to move this
>>> KIP
>>>>> forward. -- I also agree with Justin and Artem, that we want to be
>>>>> careful anyway to not allow users to break stuff too easily.
>>>>>
>>>>> As the same time, I agree that we should setup this change in a
>> forward
>>>>> looking way, and thus having a single generic handler allows us to
>>> later
>>>>> extend the handler more easily. This should also simplify follow up
>> KIP
>>>>> that might add new error cases (I actually mentioned one more to
>> Alieh
>>>>> already, but we both agreed that it might be best to exclude it from
>>> the
>>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP is not
>>>>> the end of the world.)
>>>>>
>>>>>
>>>>>
>>>>> @Chris:
>>>>>
>>>>> (2) This sounds fair to me, but not sure how "bad" it actually would
>>> be?
>>>>> If the contract is clearly defined, it seems to be fine what the KIP
>>>>> proposes, and given that such a handler is an expert API, and we can
>>>>> provide "best practices" (cf my other comment below in [AL1]), being
>> a
>>>>> little bit pragmatic sound fine to me.
>>>>>
>>>>> Not sure if I understand Justin's argument on this question?
>>>>>
>>>>>
>>>>> (3) About having a default impl or not. I am fine with adding one,
>> even
>>>>> if I am not sure why Connect could not just add its own one and plug
>> it
>>>>> in (and we would add corresponding configs for Connect, but not for
>> the
>>>>> producer itself)? For this case, we could also do this as a follow up
>>>>> KIP, but happy to include it in this KIP to provide value to Connect
>>>>> right away (even if the value might not come right away if we miss
>> the
>>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we would for
>>> sure
>>>>> plugin our own impl, and lock down the config such that users cannot
>>> set
>>>>> their own handler on the internal producer to begin with. Might be
>> good
>>>>> to elaborate why the producer should have a default? We might
>> actually
>>>>> want to add this to the KIP right away?
>>>>>
>>>>> The key for a default impl would be, to not change the current
>>> behavior,
>>>>> and having no default seems to achieve this. For the two cases you
>>>>> mentioned, it's unclear to me what default value on "upper bound on
>>>>> retires" for UnkownTopicOrPartitionException we should set? Seems it
>>>>> would need to be the same as `delivery.timeout.ms`? However, if
>> users
>>>>> have `delivery.timeout.ms` actually overwritten we would need to set
>>>>> this config somewhat "dynamic"? Is this feasible? If we hard-code 2
>>>>> minutes, it might not be backward compatible. I have the impression
>> we
>>>>> might introduce some undesired coupling? -- For the "record too
>> large"
>>>>> case, the config seems to be boolean and setting it to `false` by
>>>>> default seems to provide backward compatibility.
>>>>>
>>>>>
>>>>>
>>>>> @Artem:
>>>>>
>>>>> [AL1] While I see the point, I would think having a different
>> callback
>>>>> for every exception might not really be elegant? In the end, the
>>> handler
>>>>> is an very advanced feature anyway, and if it's implemented in a bad
>>>>> way, well, it's a user error -- we cannot protect users from
>>> everything.
>>>>> To me, a handler like this, is to some extend "business logic" and
>> if a
>>>>> user gets business logic wrong, it's hard to protect them. -- We
>> would
>>>>> of course provide best practice guidance in the JaveDocs, and explain
>>>>> that a handler should have explicit `if` statements for stuff it want
>>> to
>>>>> handle, and only a single default which return FAIL.
>>>>>
>>>>>
>>>>> [AL2] Yes, but for KS we would retry at the application layer. Ie,
>> the
>>>>> TX is not completed, we clean up and setup out task from scratch, to
>>>>> ensure the pending transaction is completed before we resume. If the
>> TX
>>>>> was indeed aborted, we would retry from older offset and thus just
>> hit
>>>>> the same error again and the loop begins again.
>>>>>
>>>>>
>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as
>>>>> business logic. If a user puts a bad filter condition in their KS
>> app,
>>>>> and drops messages, it nothing we can do about it, and this handler
>>>>> IMHO, has a similar purpose. This is also the line of thinking I
>> apply
>>>>> to EOS, to address Justin's concern about "should we allow to drop
>> for
>>>>> EOS", and my answer is "yes", because it's more business logic than
>>>>> actual error handling IMHO. And by default, we fail... So users
>> opt-in
>>>>> to add business logic to drop records. It's an application level
>>>>> decision how to write the code.
>>>>>
>>>>>
>>>>> [AL3] Maybe I misunderstand what you are saying, but to me, checking
>>> the
>>>>> size of the record upfront is exactly what the KIP proposes? No?
>>>>>
>>>>>
>>>>>
>>>>> @Justin:
>>>>>
>>>>>> I saw the sample
>>>>>> code -- is it just an if statement checking for the error before
>> the
>>>>>> handler is invoked? That seems a bit fragile.
>>>>>
>>>>> What do you mean by fragile? Not sure if I see your point.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
>>>>>> Hi Alieh,
>>>>>>
>>>>>> Thanks for the KIP.  The motivation talks about very specific
>> cases,
>>>> but
>>>>>> the interface is generic.
>>>>>>
>>>>>> [AL1]
>>>>>> If the interface evolves in the future I think we could have the
>>>>> following
>>>>>> confusion:
>>>>>>
>>>>>> 1. A user implemented SWALLOW action for both
>> RecordTooLargeException
>>>> and
>>>>>> UnknownTopicOrPartitionException.  For simpicity they just return
>>>> SWALLOW
>>>>>> from the function, because it elegantly handles all known cases.
>>>>>> 2. The interface has evolved to support a new exception.
>>>>>> 3. The user has upgraded their Kafka client.
>>>>>>
>>>>>> Now a new kind of error gets dropped on the floor without user's
>>>>> intention
>>>>>> and it would be super hard to detect and debug.
>>>>>>
>>>>>> To avoid the confusion, I think we should use handlers for specific
>>>>>> exceptions.  Then if a new exception is added it won't get silently
>>>>> swalled
>>>>>> because the user would need to add new functionality to handle it.
>>>>>>
>>>>>> I also have some higher level comments:
>>>>>>
>>>>>> [AL2]
>>>>>>> it throws a TimeoutException, and the user can only blindly retry,
>>>> which
>>>>>> may result in an infinite retry loop
>>>>>>
>>>>>> If the TimeoutException happens during transactional processing
>>>> (exactly
>>>>>> once is the desired sematnics), then the client should not retry
>> when
>>>> it
>>>>>> gets TimeoutException because without knowing the reason for
>>>>>> TimeoutExceptions, the client cannot know whether the message got
>>>>> actually
>>>>>> produced or not and retrying the message may result in duplicatees.
>>>>>>
>>>>>>> The thrown TimeoutException "cuts" the connection to the
>> underlying
>>>> root
>>>>>> cause of missing metadata
>>>>>>
>>>>>> Maybe we should fix the error handling and return the proper
>>> underlying
>>>>>> message?  Then the application can properly handle the message
>> based
>>> on
>>>>>> preferences.
>>>>>>
>>>>>> From the product perspective, it's not clear how safe it is to
>>> blindly
>>>>>> ignore UnknownTopicOrPartitionException.  This could lead to
>>> situations
>>>>>> when a simple typo could lead to massive data loss (part of the
>> data
>>>>> would
>>>>>> effectively be produced to a "black hole" and the user may not
>> notice
>>>> it
>>>>>> for a while).
>>>>>>
>>>>>> In which situations would you recommend the user to "black hole"
>>>> messages
>>>>>> in case of misconfiguration?
>>>>>>
>>>>>> [AL3]
>>>>>>
>>>>>>> If the custom handler decides on SWALLOW for
>>> RecordTooLargeException,
>>>>>>
>>>>>> Is it my understanding that this KIP proposes that functionality
>> that
>>>>> would
>>>>>> only be able to SWALLOW RecordTooLargeException that happen because
>>> the
>>>>>> producer cannot produce the record (if the broker rejects the
>> batch,
>>>> the
>>>>>> error won't get to the handler, because we cannot know which other
>>>>> records
>>>>>> get ignored).  In this case, why not just check the locally
>>> configured
>>>>> max
>>>>>> record size upfront and not produce the recrord in the first place?
>>>>> Maybe
>>>>>> we can expose a validation function from the producer that could
>>>> validate
>>>>>> the records locally, so we don't need to produce the record in
>> order
>>> to
>>>>>> know that it's invalid.
>>>>>>
>>>>>> -Artem
>>>>>>
>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
>>>>> <jols...@confluent.io.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> Alieh and Chris,
>>>>>>>
>>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I just
>>>> didn't
>>>>>>> understand how that would be ensured on the producer side. I saw
>> the
>>>>> sample
>>>>>>> code -- is it just an if statement checking for the error before
>> the
>>>>>>> handler is invoked? That seems a bit fragile.
>>>>>>>
>>>>>>> Can you clarify what you mean by `since the code does not reach
>> the
>>> KS
>>>>>>> interface and breaks somewhere in producer.` If we surfaced this
>>> error
>>>>> to
>>>>>>> the application in a better way would that also be a solution to
>> the
>>>>> issue?
>>>>>>>
>>>>>>> Justine
>>>>>>>
>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
>>>>> <asae...@confluent.io.invalid>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you, Chris and Justine, for the feedback.
>>>>>>>>
>>>>>>>>
>>>>>>>> @Chris
>>>>>>>>
>>>>>>>> 1) Flexibility: it has two meanings. The first meaning is the one
>>> you
>>>>>>>> mentioned. We are going to cover more exceptions in the future,
>> but
>>>> as
>>>>>>>> Justine mentioned, we must be very conservative about adding more
>>>>>>>> exceptions. Additionally, flexibility mainly means that the user
>> is
>>>>> able
>>>>>>> to
>>>>>>>> develop their own code. As mentioned in the motivation section
>> and
>>>> the
>>>>>>>> examples, sometimes the user decides on dropping a record based
>> on
>>>> the
>>>>>>>> topic, for example.
>>>>>>>>
>>>>>>>>
>>>>>>>> 2) Defining two separate methods for retriable and non-retriable
>>>>>>>> exceptions: although the idea is brilliant, the user may still
>>> make a
>>>>>>>> mistake by implementing the wrong method and see a non-expecting
>>>>>>> behaviour.
>>>>>>>> For example, he may implement handleRetriable() for
>>>>>>> RecordTooLargeException
>>>>>>>> and define SWALLOW for the exception, but in practice, he sees no
>>>>> change
>>>>>>> in
>>>>>>>> default behaviour since he implemented the wrong method. I think
>> we
>>>> can
>>>>>>>> never reduce the user’s mistakes to 0.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 3) Default implementation for Handler: the default behaviour is
>>>> already
>>>>>>>> preserved with NO need of implementing any handler or setting the
>>>>>>>> corresponding config parameter `custom.exception.handler`. What
>> you
>>>>> mean
>>>>>>> is
>>>>>>>> actually having a second default, which requires having both
>>>> interface
>>>>>>> and
>>>>>>>> config parameters. About UnknownTopicOrPartitionException: the
>>>> producer
>>>>>>>> already offers the config parameter `max.block.ms` which
>>> determines
>>>>> the
>>>>>>>> duration of retrying. The main purpose of the user who needs the
>>>>> handler
>>>>>>> is
>>>>>>>> to get the root cause of TimeoutException and handle it in the
>> way
>>> he
>>>>>>>> intends. The KIP explains the necessity of it for KS users.
>>>>>>>>
>>>>>>>>
>>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the error,
>>>> while
>>>>>>>> SKIP means skip the record, I think. If it makes sense for more
>>> ppl,
>>>> I
>>>>>>> can
>>>>>>>> change it to SKIP
>>>>>>>>
>>>>>>>>
>>>>>>>> @Justine
>>>>>>>>
>>>>>>>> 1) was addressed by Chris.
>>>>>>>>
>>>>>>>> 2 and 3) The problem is exactly what you mentioned. Currently,
>>> there
>>>> is
>>>>>>> no
>>>>>>>> way to handle these issues application-side. Even KS users who
>>>>> implement
>>>>>>> KS
>>>>>>>> ProductionExceptionHandler are not able to handle the exceptions
>> as
>>>>> they
>>>>>>>> intend since the code does not reach the KS interface and breaks
>>>>>>> somewhere
>>>>>>>> in producer.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Alieh
>>>>>>>>
>>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
>>>> fearthecel...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Justine,
>>>>>>>>>
>>>>>>>>> The method signatures for the interface are indeed open-ended,
>> but
>>>> the
>>>>>>>> KIP
>>>>>>>>> states that its uses will be limited. See the motivation
>> section:
>>>>>>>>>
>>>>>>>>>> We believe that the user should be able to develop custom
>>> exception
>>>>>>>>> handlers for managing producer exceptions. On the other hand,
>> this
>>>>> will
>>>>>>>> be
>>>>>>>>> an expert-level API, and using that may result in strange
>>> behaviour
>>>> in
>>>>>>>> the
>>>>>>>>> system, making it hard to find the root cause. Therefore, the
>>> custom
>>>>>>>>> handler is currently limited to handling RecordTooLargeException
>>> and
>>>>>>>>> UnknownTopicOrPartitionException.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Chris
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
>>>>> <jols...@confluent.io.invalid
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Alieh,
>>>>>>>>>>
>>>>>>>>>> I was out for KSB and then was also sick. :(
>>>>>>>>>>
>>>>>>>>>> To your point 1) Chris, I don't think it is limited to two
>>> specific
>>>>>>>>>> scenarios, since the interface accepts a generic Exception e
>> and
>>>> can
>>>>>>> be
>>>>>>>>>> implemented to check if that e is an instanceof any exception.
>> I
>>>>>>> didn't
>>>>>>>>> see
>>>>>>>>>> anywhere that specific errors are enforced. I'm a bit concerned
>>>> about
>>>>>>>>> this
>>>>>>>>>> actually. I'm concerned about the opened-endedness and the
>>> contract
>>>>>>> we
>>>>>>>>> have
>>>>>>>>>> with transactions. We are allowing the client to make decisions
>>>> that
>>>>>>>> are
>>>>>>>>>> somewhat invisible to the server. As an aside, can we build in
>>> log
>>>>>>>>> messages
>>>>>>>>>> when the handler decides to skip etc a message. I'm really
>>>> concerned
>>>>>>>>> about
>>>>>>>>>> messages being silently dropped.
>>>>>>>>>>
>>>>>>>>>> I do think Chris's point 2) about retriable vs non retriable
>>> errors
>>>>>>> is
>>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic or
>>>> partition
>>>>>>>>>> exception too early, as there are cases where it can be
>>> transient.
>>>>>>>>>>
>>>>>>>>>> I'm still a little bit wary of allowing dropping records as
>> part
>>> of
>>>>>>> EOS
>>>>>>>>>> generally as in many cases, these errors signify an issue with
>>> the
>>>>>>>>> original
>>>>>>>>>> data. I understand that streams and connect/mirror maker may
>> have
>>>>>>>> reasons
>>>>>>>>>> they want to progress past these messages, but wondering if
>> there
>>>> is
>>>>>>> a
>>>>>>>>> way
>>>>>>>>>> that can be done application-side. I'm willing to accept this
>>> sort
>>>> of
>>>>>>>>>> proposal if we can make it clear that this sort of thing is
>>>> happening
>>>>>>>> and
>>>>>>>>>> we limit the blast radius for what we can do.
>>>>>>>>>>
>>>>>>>>>> Justine
>>>>>>>>>>
>>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
>>>> <chr...@aiven.io.invalid
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>
>>>>>>>>>>> Sorry for the delay, I've been out sick. I still have some
>>>> thoughts
>>>>>>>>> that
>>>>>>>>>>> I'd like to see addressed before voting.
>>>>>>>>>>>
>>>>>>>>>>> 1) If flexibility is the motivation for a pluggable interface,
>>> why
>>>>>>>> are
>>>>>>>>> we
>>>>>>>>>>> only limiting the uses for this interface to two very specific
>>>>>>>>> scenarios?
>>>>>>>>>>> Why not also allow, e.g., authorization errors to be handled
>> as
>>>>>>> well
>>>>>>>>>>> (allowing users to drop records destined for some off-limits
>>>>>>> topics,
>>>>>>>> or
>>>>>>>>>>> retry for a limited duration in case there's a delay in the
>>>>>>>> propagation
>>>>>>>>>> of
>>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other
>> errors
>>>>>>> that
>>>>>>>>>> could
>>>>>>>>>>> be handled with this new API, both to avoid the follow-up work
>>> of
>>>>>>>>> another
>>>>>>>>>>> KIP to address them in the future, and to make sure that we're
>>> not
>>>>>>>>>> painting
>>>>>>>>>>> ourselves into a corner with the current API in a way that
>> would
>>>>>>> make
>>>>>>>>>>> future modifications difficult.
>>>>>>>>>>>
>>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
>>> non-retriable
>>>>>>>>> errors
>>>>>>>>>>> are handled with the interface. Why not introduce two separate
>>>>>>>> methods
>>>>>>>>> to
>>>>>>>>>>> handle each case separately? That way there's no ambiguity or
>>>>>>>> implicit
>>>>>>>>>>> behavior when, e.g., attempting to retry on a
>>>>>>>> RecordTooLargeException.
>>>>>>>>>> This
>>>>>>>>>>> could be something like `NonRetriableResponse
>>>>>>> handle(ProducerRecord,
>>>>>>>>>>> Exception)` and `RetriableResponse
>>> handleRetriable(ProducerRecord,
>>>>>>>>>>> Exception)`, though the exact names and shape can obviously be
>>>>>>> toyed
>>>>>>>>>> with a
>>>>>>>>>>> bit.
>>>>>>>>>>>
>>>>>>>>>>> 3) Although the flexibility of a pluggable interface may
>> benefit
>>>>>>> some
>>>>>>>>>>> users' custom producer applications and Kafka Streams
>>>> applications,
>>>>>>>> it
>>>>>>>>>>> comes at significant deployment cost for other low-/no-code
>>>>>>>>> environments,
>>>>>>>>>>> including but not limited to Kafka Connect and MirrorMaker 2.
>>> Can
>>>>>>> we
>>>>>>>>> add
>>>>>>>>>> a
>>>>>>>>>>> default implementation of the exception handler that allows
>> for
>>>>>>> some
>>>>>>>>>> simple
>>>>>>>>>>> behavior to be tweaked via configuration property? Two things
>>> that
>>>>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>> nice to have would be A) an upper bound on the retry time for
>>>>>>>>>>> unknown-topic-partition exceptions and B) an option to drop
>>>> records
>>>>>>>>> that
>>>>>>>>>>> are large enough to trigger a record-too-large exception.
>>>>>>>>>>>
>>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of the
>>>> proposed
>>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious,
>>> especially
>>>>>>>> when
>>>>>>>>>>> trying to guess the behavior for retriable errors.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>>
>>>>>>>>>>> Chris
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
>>>>>>>>>> <asae...@confluent.io.invalid
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> A summary of the KIP and the discussions:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The KIP introduces a handler interface for Producer in order
>> to
>>>>>>>>> handle
>>>>>>>>>>> two
>>>>>>>>>>>> exceptions: RecordTooLargeException and
>>>>>>>>>> UnknownTopicOrPartitionException.
>>>>>>>>>>>> The handler handles the exceptions per-record.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
>> sections]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the producer
>>>>>>> collects
>>>>>>>>>>> multiple
>>>>>>>>>>>> records in batches. Then a RecordTooLargeException related
>> to a
>>>>>>>>> single
>>>>>>>>>>>> record leads to failing the entire batch. A custom exception
>>>>>>>> handler
>>>>>>>>> in
>>>>>>>>>>>> this case may decide on dropping the record and continuing
>> the
>>>>>>>>>>> processing.
>>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a
>> record
>>>>>>>> that
>>>>>>>>> is
>>>>>>>>>>> too
>>>>>>>>>>>> large is a poison pill record, and there is no way to skip
>> over
>>>>>>>> it. A
>>>>>>>>>>>> handler would allow us to react to this error inside the
>>>>>>> producer,
>>>>>>>>>> i.e.,
>>>>>>>>>>>> local to where the error happens, and thus simplify the
>> overall
>>>>>>>> code
>>>>>>>>>>>> significantly. Please read the Motivation section for more
>>>>>>>>> explanation.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the producer
>>>>>>>> handles
>>>>>>>>>>> this
>>>>>>>>>>>> exception internally and only issues a WARN log about missing
>>>>>>>>> metadata
>>>>>>>>>>> and
>>>>>>>>>>>> retries internally. Later, when the producer hits "
>>>>>>>>> deliver.timeout.ms"
>>>>>>>>>>> it
>>>>>>>>>>>> throws a TimeoutException, and the user can only blindly
>> retry,
>>>>>>>> which
>>>>>>>>>> may
>>>>>>>>>>>> result in an infinite retry loop. The thrown TimeoutException
>>>>>>>> "cuts"
>>>>>>>>>> the
>>>>>>>>>>>> connection to the underlying root cause of missing metadata
>>>>>>> (which
>>>>>>>>>> could
>>>>>>>>>>>> indeed be a transient error but is persistent for a
>>> non-existing
>>>>>>>>>> topic).
>>>>>>>>>>>> Thus, there is no programmatic way to break the infinite
>> retry
>>>>>>>> loop.
>>>>>>>>>>> Kafka
>>>>>>>>>>>> Streams also blindly retries for this case, and the
>> application
>>>>>>>> gets
>>>>>>>>>>> stuck.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - Having interface vs configuration option: [Motivation,
>>>>>>> Examples,
>>>>>>>>> and
>>>>>>>>>>>> Rejected Alternatives sections]
>>>>>>>>>>>>
>>>>>>>>>>>> Our solution is introducing an interface due to the full
>>>>>>>> flexibility
>>>>>>>>>> that
>>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams ones,
>>>>>>>> determine
>>>>>>>>>> the
>>>>>>>>>>>> handler's behaviour based on the situation. For example, f
>>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may want
>> to
>>>>>>>> raise
>>>>>>>>> an
>>>>>>>>>>>> error for some topics but retry it for other topics. Having a
>>>>>>>>>>> configuration
>>>>>>>>>>>> option with a fixed set of possibilities does not serve the
>>>>>>> user's
>>>>>>>>>>>> needs. See Example 2, please.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces
>> section]
>>>>>>>>>>>>
>>>>>>>>>>>> If the custom handler decides on SWALLOW for
>>>>>>>> RecordTooLargeException,
>>>>>>>>>>> then
>>>>>>>>>>>> this record will not be a part of the batch of transactions
>> and
>>>>>>>> will
>>>>>>>>>> also
>>>>>>>>>>>> not be sent to the broker in non-transactional mode. So no
>>>>>>> worries
>>>>>>>>>> about
>>>>>>>>>>>> getting a RecordTooLargeException from the broker in this
>> case,
>>>>>>> as
>>>>>>>>> the
>>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW means
>>> drop
>>>>>>>> the
>>>>>>>>>>> record
>>>>>>>>>>>> and continue/swallow the error.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - What if the handle() method implements RETRY for
>>>>>>>>>>> RecordTooLargeException?
>>>>>>>>>>>> [Proposed Changes section]
>>>>>>>>>>>>
>>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW for
>>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal to
>> FAIL.
>>>>>>>> This
>>>>>>>>> is
>>>>>>>>>>>> well documented/informed in javadoc.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - What if the handle() method of the handler throws an
>>> exception?
>>>>>>>>>>>>
>>>>>>>>>>>> The handler is expected to have correct code. If it throws an
>>>>>>>>>> exception,
>>>>>>>>>>>> everything fails.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is a PoC PR <https://github.com/apache/kafka/pull/15846
>>>
>>>>>>> ONLY
>>>>>>>>> for
>>>>>>>>>>>> RecordTooLargeException. The code changes related to
>>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this PR
>>> LATER.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Looking forward to your feedback again.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>
>>>>>>>>>>>> Alieh
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
>> k...@kirktrue.pro>
>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the updates!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Comments inline...
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
>>>>>>>>>>> <asae...@confluent.io.INVALID
>>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Addressing some of the main concerns:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by broker,
>>>>>>>> producer
>>>>>>>>>> and
>>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler`
>> interface
>>>>>>>> is
>>>>>>>>>>>>> introduced
>>>>>>>>>>>>>> to affect only the exceptions thrown from the producer.
>> This
>>>>>>>> KIP
>>>>>>>>>> very
>>>>>>>>>>>>>> specifically means to provide a possibility to manage the
>>>>>>>>>>>>>> `RecordTooLargeException` thrown from the Producer.send()
>>>>>>>> method.
>>>>>>>>>>>> Please
>>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
>>>>>>> investigated
>>>>>>>>> the
>>>>>>>>>>>> issue
>>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern about
>> how
>>>>>>>> we
>>>>>>>>>>> handle
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> errors as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are called
>>>>>>>> when
>>>>>>>>>> the
>>>>>>>>>>>>> record
>>>>>>>>>>>>>> sent to the server is acknowledged, while this is not the
>>>>>>>> desired
>>>>>>>>>>> time
>>>>>>>>>>>>> for
>>>>>>>>>>>>>> all exceptions. We intend to handle exceptions beforehand.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I guess it makes sense to keep the expectation for when
>>>>>>> Callback
>>>>>>>> is
>>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> - What if the custom handler returns RETRY for
>>>>>>>>>>>>> `RecordTooLargeException`? I
>>>>>>>>>>>>>> assume changing the producer configuration at runtime is
>>>>>>>>> possible.
>>>>>>>>>> If
>>>>>>>>>>>> so,
>>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in the
>>>>>>> next
>>>>>>>>>> try,
>>>>>>>>>>>> the
>>>>>>>>>>>>>> too large record is not poisoning any more. I am not 100%
>>>>>>> sure
>>>>>>>>>> about
>>>>>>>>>>>> the
>>>>>>>>>>>>>> technical details, though. Otherwise, we can consider the
>>>>>>> RETRY
>>>>>>>>> as
>>>>>>>>>>> FAIL
>>>>>>>>>>>>> for
>>>>>>>>>>>>>> this exception. Another solution would be to consider a
>>>>>>>> constant
>>>>>>>>>>> number
>>>>>>>>>>>>> of
>>>>>>>>>>>>>> times for RETRY which can be useful for other exceptions as
>>>>>>>> well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It’s not presently possible to change the configuration of
>> an
>>>>>>>>>> existing
>>>>>>>>>>>>> Producer at runtime. So if a record hits a
>>>>>>>> RecordTooLargeException
>>>>>>>>>>> once,
>>>>>>>>>>>> no
>>>>>>>>>>>>> amount of retrying (with the current Producer) will change
>>> that
>>>>>>>>> fact.
>>>>>>>>>>> So
>>>>>>>>>>>>> I’m still a little stuck on how to handle a response of
>> RETRY
>>>>>>> for
>>>>>>>>> an
>>>>>>>>>>>>> “oversized” record.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> - What if the handle() method itself throws an exception? I
>>>>>>>> think
>>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be exactly
>>>>>>>> like
>>>>>>>>>> when
>>>>>>>>>>>> no
>>>>>>>>>>>>>> custom handler is defined since the user actually did not
>>>>>>> have
>>>>>>>> a
>>>>>>>>>>>> working
>>>>>>>>>>>>>> handler.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is the
>> right
>>>>>>>>>> choice.
>>>>>>>>>>> It
>>>>>>>>>>>>> then becomes a silent failure that might have repercussions,
>>>>>>>>>> depending
>>>>>>>>>>> on
>>>>>>>>>>>>> the business logic. A user would have to proactively trawls
>>>>>>>> through
>>>>>>>>>> the
>>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
>>>>>>>>>>>>>
>>>>>>>>>>>>>> - Why not use config parameters instead of an interface? As
>>>>>>>>>> explained
>>>>>>>>>>>> in
>>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that the
>>>>>>> handler
>>>>>>>>>> will
>>>>>>>>>>> be
>>>>>>>>>>>>>> used for a greater number of exceptions in the future.
>>>>>>>> Defining a
>>>>>>>>>>>>>> configuration parameter for each exception may make the
>>>>>>>>>>> configuration a
>>>>>>>>>>>>> bit
>>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Agreed that the logic-via-configuration approach is weird
>> and
>>>>>>>>>> limiting.
>>>>>>>>>>>>> Forget I ever suggested it ;)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I’d think additional background in the Motivation section
>>> would
>>>>>>>>> help
>>>>>>>>>> me
>>>>>>>>>>>>> understand how users might use this feature beyond a)
>> skipping
>>>>>>>>>>>> “oversized”
>>>>>>>>>>>>> records, and b) not retrying missing topics.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Small change:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for brevity
>>>>>>> and
>>>>>>>>>>>>> simplicity.
>>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
>>>>>>>>>>>>>
>>>>>>>>>>>>> The name change sounds good to me.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Alieh!
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I thank you all again for your useful
>> questions/suggestions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would be happy to hear more of your concerns, as stated
>> in
>>>>>>>> some
>>>>>>>>>>>>> feedback.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Alieh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Alieh for the updates.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm a little concerned about the design pattern here. It
>>>>>>> seems
>>>>>>>>>> like
>>>>>>>>>>> we
>>>>>>>>>>>>> want
>>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic
>>>>>>> handler.
>>>>>>>>>>>>>>> I think we tried to narrow down on the specific errors we
>>>>>>> want
>>>>>>>>> to
>>>>>>>>>>>>> handle,
>>>>>>>>>>>>>>> but it feels a little clunky as we have a generic thing
>> for
>>>>>>>> two
>>>>>>>>>>>> specific
>>>>>>>>>>>>>>> errors.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to solve
>>>>>>>> these
>>>>>>>>>>>>> problems. I
>>>>>>>>>>>>>>> agree though that we will need something more than the
>> error
>>>>>>>>>> classes
>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>> proposing if we want to have different handling be
>>>>>>>> configurable.
>>>>>>>>>>>>>>> My concern is that the open-endedness of a handler means
>>>>>>> that
>>>>>>>> we
>>>>>>>>>> are
>>>>>>>>>>>>>>> creating more problems than we are solving. It is still
>>>>>>>> unclear
>>>>>>>>> to
>>>>>>>>>>> me
>>>>>>>>>>>>> how
>>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could include
>> an
>>>>>>>>>> example?
>>>>>>>>>>>> It
>>>>>>>>>>>>>>> seems like there is a specific use case in mind and maybe
>> we
>>>>>>>> can
>>>>>>>>>>> make
>>>>>>>>>>>> a
>>>>>>>>>>>>>>> design that is tighter and supports that case.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Justine
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <
>>>>>>> k...@kirktrue.pro>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the KIP!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> A few questions:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> K1. What is the expected behavior for the producer if it
>>>>>>>>>> generates
>>>>>>>>>>> a
>>>>>>>>>>>>>>>> RecordTooLargeException, but the handler returns RETRY?
>>>>>>>>>>>>>>>> K2. How do we determine which Record was responsible for
>>>>>>> the
>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException since we get that
>> response
>>>>>>>>> when
>>>>>>>>>>>>>>> sending  a
>>>>>>>>>>>>>>>> batch of records?
>>>>>>>>>>>>>>>> K3. What is the expected behavior if the handle() method
>>>>>>>> itself
>>>>>>>>>>>> throws
>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>> error?
>>>>>>>>>>>>>>>> K4. What is the downside of adding an onError() method to
>>>>>>> the
>>>>>>>>>>>>> Producer’s
>>>>>>>>>>>>>>>> Callback interface vs. a new mechanism?
>>>>>>>>>>>>>>>> K5. Can we change “ProducerExceptionHandlerResponse" to
>>>>>>> just
>>>>>>>>>>>> “Response”
>>>>>>>>>>>>>>>> given that it’s an inner enum?
>>>>>>>>>>>>>>>> K6. Any recommendation for callback authors to handle
>>>>>>>> different
>>>>>>>>>>>>> behavior
>>>>>>>>>>>>>>>> for different topics?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I’ll echo what others have said, it would help me
>>>>>>> understand
>>>>>>>>> why
>>>>>>>>>> we
>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>> another handler class if there were more examples in the
>>>>>>>>>> Motivation
>>>>>>>>>>>>>>>> section. As it stands now, I agree with Chris that the
>>>>>>> stated
>>>>>>>>>>> issues
>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>> be solved by adding two new configuration options:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>    oversized.record.behavior=fail
>>>>>>>>>>>>>>>>    retry.on.unknown.topic.or.partition=true
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What I’m not yet able to wrap my head around is: what
>>>>>>> exactly
>>>>>>>>>> would
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> logic in the handler be? I’m not very imaginative, so I’m
>>>>>>>>>> assuming
>>>>>>>>>>>>> they’d
>>>>>>>>>>>>>>>> mostly be if-this-then-that. However, if they’re more
>>>>>>>>>> complicated,
>>>>>>>>>>>> I’d
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>> other concerns.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Kirk
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
>>>>>>>>>>>>> <asae...@confluent.io.INVALID
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thank you all for the feedback!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Addressing the main concern: The KIP is about giving the
>>>>>>>> user
>>>>>>>>>> the
>>>>>>>>>>>>>>> ability
>>>>>>>>>>>>>>>>> to handle producer exceptions, but to be more
>> conservative
>>>>>>>> and
>>>>>>>>>>> avoid
>>>>>>>>>>>>>>>> future
>>>>>>>>>>>>>>>>> issues, we decided to be limited to a short list of
>>>>>>>>> exceptions.
>>>>>>>>>> I
>>>>>>>>>>>>>>>> included
>>>>>>>>>>>>>>>>> *RecordTooLargeExceptin* and
>>>>>>>>> *UnknownTopicOrPartitionException.
>>>>>>>>>>>> *Open
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> suggestion for adding some more ;-)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KIP Updates:
>>>>>>>>>>>>>>>>> - clarified the way that the user should configure the
>>>>>>>>> Producer
>>>>>>>>>> to
>>>>>>>>>>>> use
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> custom handler. I think adding a producer config
>> property
>>>>>>> is
>>>>>>>>> the
>>>>>>>>>>>>>>> cleanest
>>>>>>>>>>>>>>>>> one.
>>>>>>>>>>>>>>>>> - changed the *ClientExceptionHandler* to
>>>>>>>>>>> *ProducerExceptionHandler*
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> closer to what we are changing.
>>>>>>>>>>>>>>>>> - added the ProducerRecord as the input parameter of the
>>>>>>>>>> handle()
>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>> - increased the response types to 3 to have fail and two
>>>>>>>> types
>>>>>>>>>> of
>>>>>>>>>>>>>>>> continue.
>>>>>>>>>>>>>>>>> - The default behaviour is having no custom handler,
>>>>>>> having
>>>>>>>>> the
>>>>>>>>>>>>>>>>> corresponding config parameter set to null. Therefore,
>> the
>>>>>>>> KIP
>>>>>>>>>>>>> provides
>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>> default implementation of the interface.
>>>>>>>>>>>>>>>>> - We follow the interface solution as described in the
>>>>>>>>>>>>>>>>> Rejected Alternetives section.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Alieh
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <
>>>>>>>>>> mj...@apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh! It addresses an important
>> case
>>>>>>>> for
>>>>>>>>>>> error
>>>>>>>>>>>>>>>>>> handling.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I agree that using this handler would be an expert API,
>>>>>>> as
>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>> a few people. But I don't think it would be a reason to
>>>>>>> not
>>>>>>>>> add
>>>>>>>>>>> it.
>>>>>>>>>>>>>>> It's
>>>>>>>>>>>>>>>>>> always a tricky tradeoff what to expose to users and to
>>>>>>>> avoid
>>>>>>>>>>> foot
>>>>>>>>>>>>>>> guns,
>>>>>>>>>>>>>>>>>> but we added similar handlers to Kafka Streams, and
>> have
>>>>>>>> good
>>>>>>>>>>>>>>> experience
>>>>>>>>>>>>>>>>>> with it. Hence, I understand, but don't share the
>> concern
>>>>>>>>>> raised.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I also agree that there is some responsibility by the
>>>>>>> user
>>>>>>>> to
>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>> how such a handler should be implemented to not drop
>> data
>>>>>>>> by
>>>>>>>>>>>>> accident.
>>>>>>>>>>>>>>>>>> But it seem unavoidable and acceptable.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> While I understand that a "simpler / reduced" API (eg
>> via
>>>>>>>>>>> configs)
>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>> also work, I personally prefer a full handler. Configs
>>>>>>> have
>>>>>>>>> the
>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>> issue that they could be miss-used potentially leading
>> to
>>>>>>>>>>>> incorrectly
>>>>>>>>>>>>>>>>>> dropped data, but at the same time are less flexible
>> (and
>>>>>>>>> thus
>>>>>>>>>>>> maybe
>>>>>>>>>>>>>>>>>> ever harder to use correctly...?). Base on my
>> experience,
>>>>>>>>> there
>>>>>>>>>>> is
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>> often weird corner case for which it make sense to also
>>>>>>>> drop
>>>>>>>>>>>> records
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> other exceptions, and a full handler has the advantage
>> of
>>>>>>>>> full
>>>>>>>>>>>>>>>>>> flexibility and "absolute power!".
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> To be fair: I don't know the exact code paths of the
>>>>>>>> producer
>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> details, so please keep me honest. But my understanding
>>>>>>> is,
>>>>>>>>>> that
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>> aims to allow users to react to internal exception, and
>>>>>>>>> decide
>>>>>>>>>> to
>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>>>> retrying internally, swallow the error and drop the
>>>>>>> record,
>>>>>>>>> or
>>>>>>>>>>>> raise
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> error?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Maybe the KIP would need to be a little bit more
>> precises
>>>>>>>>> what
>>>>>>>>>>>> error
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> want to cover -- I don't think this list must be
>>>>>>>> exhaustive,
>>>>>>>>> as
>>>>>>>>>>> we
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> always do follow up KIP to also apply the handler to
>>>>>>> other
>>>>>>>>>> errors
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> expand the scope of the handler. The KIP does mention
>>>>>>>>> examples,
>>>>>>>>>>> but
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> might be good to explicitly state for what cases the
>>>>>>>> handler
>>>>>>>>>> gets
>>>>>>>>>>>>>>>> applied?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am also not sure if CONTINUE and FAIL are enough
>>>>>>> options?
>>>>>>>>>> Don't
>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> need three options? Or would `CONTINUE` have different
>>>>>>>>> meaning
>>>>>>>>>>>>>>> depending
>>>>>>>>>>>>>>>>>> on the type of error? Ie, for a retryable error
>>>>>>> `CONTINUE`
>>>>>>>>>> would
>>>>>>>>>>>> mean
>>>>>>>>>>>>>>>>>> keep retrying internally, but for a non-retryable error
>>>>>>>>>>> `CONTINUE`
>>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>>>> swallow the error and drop the record? This semantic
>>>>>>>> overload
>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>> tricky to reason about by users, so it might better to
>>>>>>>> split
>>>>>>>>>>>>>>> `CONTINUE`
>>>>>>>>>>>>>>>>>> into two cases -> `RETRY` and `SWALLOW` (or some better
>>>>>>>>> names).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Additionally, should we just ship a
>>>>>>>>>>> `DefaultClientExceptionHandler`
>>>>>>>>>>>>>>>>>> which would return `FAIL`, for backward compatibility.
>> Or
>>>>>>>>> don't
>>>>>>>>>>>> have
>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>> default handler to begin with and allow it to be
>> `null`?
>>>>>>> I
>>>>>>>>>> don't
>>>>>>>>>>>> see
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> need for a specific `TransactionExceptionHandler`. To
>> me,
>>>>>>>> the
>>>>>>>>>>> goal
>>>>>>>>>>>>>>>>>> should be to not modify the default behavior at all,
>> but
>>>>>>> to
>>>>>>>>>> just
>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>> users to change the default behavior if there is a
>> need.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What is missing on the KIP though it, how the handler
>> is
>>>>>>>>> passed
>>>>>>>>>>>> into
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> producer thought? Would we need a new config which
>> allows
>>>>>>>> to
>>>>>>>>>> set
>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> custom handler? And/or would we allow to pass in an
>>>>>>>> instance
>>>>>>>>>> via
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> constructor or add a new method to set a handler?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
>>>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Exception handling in the Kafka producer and consumer
>> is
>>>>>>>>>> really
>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> ideal.
>>>>>>>>>>>>>>>>>>> It’s even harder working out what’s going on with the
>>>>>>>>>> consumer.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I’m a bit nervous about this KIP and I agree with
>> Chris
>>>>>>>> that
>>>>>>>>>> it
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>> with additional
>>>>>>>>>>>>>>>>>>> motivation. This would be an expert-level interface
>>>>>>> given
>>>>>>>>> how
>>>>>>>>>>>>>>>> complicated
>>>>>>>>>>>>>>>>>>> the exception handling for Kafka has become.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 7. The application is not really aware of the batching
>>>>>>>> being
>>>>>>>>>>> done
>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>> behalf.
>>>>>>>>>>>>>>>>>>> The ProduceResponse can actually return an array of
>>>>>>>> records
>>>>>>>>>>> which
>>>>>>>>>>>>>>>> failed
>>>>>>>>>>>>>>>>>>> per batch. If you get RecordTooLargeException, and
>> want
>>>>>>> to
>>>>>>>>>>> retry,
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>> probably
>>>>>>>>>>>>>>>>>>> need to remove the offending records from the batch
>> and
>>>>>>>>> retry
>>>>>>>>>>> it.
>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>> is getting fiddly.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 8. There is already o.a.k.clients.producer.Callback. I
>>>>>>>>> wonder
>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>> alternative might be to add a method to the existing
>>>>>>>>> Callback
>>>>>>>>>>>>>>>> interface,
>>>>>>>>>>>>>>>>>> such as:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>  ClientExceptionResponse onException(Exception
>>>>>>> exception)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It would be called when a ProduceResponse contains an
>>>>>>>> error,
>>>>>>>>>> but
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> producer is going to retry. It tells the producer
>>>>>>> whether
>>>>>>>> to
>>>>>>>>>> go
>>>>>>>>>>>>> ahead
>>>>>>>>>>>>>>>>>> with the retry
>>>>>>>>>>>>>>>>>>> or not. The default implementation would be to
>> CONTINUE,
>>>>>>>>>> because
>>>>>>>>>>>>>>> that’s
>>>>>>>>>>>>>>>>>>> just continuing to retry as planned. Note that this
>> is a
>>>>>>>>>>>> per-record
>>>>>>>>>>>>>>>>>> callback, so
>>>>>>>>>>>>>>>>>>> the application would be able to understand which
>>>>>>> records
>>>>>>>>>>> failed.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> By using an existing interface, we already know how to
>>>>>>>>>> configure
>>>>>>>>>>>> it
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> we know
>>>>>>>>>>>>>>>>>>> about the threading model for calling it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton
>>>>>>>>>>> <chr...@aiven.io.INVALID
>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! The issue with writing to
>>>>>>>> non-existent
>>>>>>>>>>> topics
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> particularly frustrating for users of Kafka Connect
>> and
>>>>>>>> has
>>>>>>>>>>> been
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>> of a handful of Jira tickets over the years. My
>>>>>>> thoughts:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 1. An additional detail we can add to the motivation
>>>>>>> (or
>>>>>>>>>>> possibly
>>>>>>>>>>>>>>>>>> rejected
>>>>>>>>>>>>>>>>>>>> alternatives) section is that this kind of custom
>> retry
>>>>>>>>> logic
>>>>>>>>>>>> can't
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> implemented by hand by, e.g., setting retries to 0 in
>>>>>>> the
>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>> config
>>>>>>>>>>>>>>>>>>>> and handling exceptions at the application level. Or
>>>>>>>>> rather,
>>>>>>>>>> it
>>>>>>>>>>>>> can,
>>>>>>>>>>>>>>>>>> but 1)
>>>>>>>>>>>>>>>>>>>> it's a bit painful to have to reimplement at every
>>>>>>>>> call-site
>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> Producer::send (and any code that awaits the returned
>>>>>>>>> Future)
>>>>>>>>>>> and
>>>>>>>>>>>>> 2)
>>>>>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>>>>> impossible to do this without losing idempotency on
>>>>>>>>> retries.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 2. That said, I wonder if a pluggable interface is
>>>>>>> really
>>>>>>>>> the
>>>>>>>>>>>> right
>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>>> here. Documenting the interactions of a producer with
>>>>>>>>>>>>>>>>>>>> a ClientExceptionHandler instance will be tricky, and
>>>>>>>>>>>> implementing
>>>>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>>>>>> will also be a fair amount of work. I believe that
>>>>>>> there
>>>>>>>>>> needs
>>>>>>>>>>> to
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> more granularity for how writes to non-existent
>> topics
>>>>>>>> (or
>>>>>>>>>>>> really,
>>>>>>>>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from
>> the
>>>>>>>>>> broker)
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> handled,
>>>>>>>>>>>>>>>>>>>> but I'm torn between keeping it simple with maybe one
>>>>>>> or
>>>>>>>>> two
>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>> config properties, or a full-blown pluggable
>> interface.
>>>>>>>> If
>>>>>>>>>>> there
>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>> cases that would benefit from a pluggable interface,
>> it
>>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>>>>> nice
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> identify these and add them to the KIP to strengthen
>>>>>>> the
>>>>>>>>>>>>> motivation.
>>>>>>>>>>>>>>>>>> Right
>>>>>>>>>>>>>>>>>>>> now, I'm not sure the two that are mentioned in the
>>>>>>>>>> motivation
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>> sufficient.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 3. Alternatively, a possible compromise is for this
>> KIP
>>>>>>>> to
>>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> properties that dictate how to handle
>>>>>>>>> unknown-topic-partition
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> record-too-large errors, with the thinking that if we
>>>>>>>>>>> introduce a
>>>>>>>>>>>>>>>>>> pluggable
>>>>>>>>>>>>>>>>>>>> interface later on, these properties will be
>> recognized
>>>>>>>> by
>>>>>>>>>> the
>>>>>>>>>>>>>>> default
>>>>>>>>>>>>>>>>>>>> implementation of that interface but could be
>>>>>>> completely
>>>>>>>>>>> ignored
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>> replaced by alternative implementations.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 4. (Nit) You can remove the "This page is meant as a
>>>>>>>>> template
>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> writing a
>>>>>>>>>>>>>>>>>>>> KIP..." part from the KIP. It's not a template
>> anymore
>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 5. If we do go the pluggable interface route,
>> wouldn't
>>>>>>> we
>>>>>>>>>> want
>>>>>>>>>>> to
>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> possibility for retry logic? The simplest version of
>>>>>>> this
>>>>>>>>>> could
>>>>>>>>>>>> be
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> add a
>>>>>>>>>>>>>>>>>>>> RETRY value to the ClientExceptionHandlerResponse
>> enum.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead
>> of
>>>>>>>>>>>> "CONTINUE"
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> the ClientExceptionHandlerResponse enum, since they
>>>>>>> cause
>>>>>>>>>>> records
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> dropped.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hey Alieh,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I echo what Omnia says, I'm not sure I understand
>> the
>>>>>>>>>>>> implications
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> change and I think more detail is needed.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This comment also confused me a bit:
>>>>>>>>>>>>>>>>>>>>> * {@code ClientExceptionHandler} that continues the
>>>>>>>>>>> transaction
>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>> if a
>>>>>>>>>>>>>>>>>>>>> record is too large.
>>>>>>>>>>>>>>>>>>>>> * Otherwise, it makes the transaction to fail.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Relatedly, I've been working with some folks on a
>> KIP
>>>>>>>> for
>>>>>>>>>>>>>>>> transactions
>>>>>>>>>>>>>>>>>>>>> errors and how they are handled. Specifically for
>> the
>>>>>>>>>>>>>>>>>>>>> RecordTooLargeException (and a few other errors), we
>>>>>>>> want
>>>>>>>>> to
>>>>>>>>>>>> give
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>> error category for this error that allows the
>>>>>>>> application
>>>>>>>>> to
>>>>>>>>>>>>> choose
>>>>>>>>>>>>>>>>>> how it
>>>>>>>>>>>>>>>>>>>>> is handled. Maybe this KIP is something that you are
>>>>>>>>> looking
>>>>>>>>>>>> for?
>>>>>>>>>>>>>>>> Stay
>>>>>>>>>>>>>>>>>>>>> tuned :)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Justine
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
>>>>>>>>>>>>>>>> o.g.h.ibra...@gmail.com
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I have couple of comments
>>>>>>>>>>>>>>>>>>>>>> - You mentioned in the KIP motivation,
>>>>>>>>>>>>>>>>>>>>>>> Another example for which a production exception
>>>>>>>> handler
>>>>>>>>>>> could
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>> is if a user tries to write into a non-existing
>>>>>>> topic,
>>>>>>>>>> which
>>>>>>>>>>>>>>>> returns a
>>>>>>>>>>>>>>>>>>>>>> retryable error code; with infinite retries, the
>>>>>>>> producer
>>>>>>>>>>> would
>>>>>>>>>>>>>>> hang
>>>>>>>>>>>>>>>>>>>>>> retrying forever. A handler could help to break the
>>>>>>>>>> infinite
>>>>>>>>>>>>> retry
>>>>>>>>>>>>>>>>>> loop.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> How the handler can differentiate between something
>>>>>>>> that
>>>>>>>>> is
>>>>>>>>>>>>>>>> temporary
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> it should keep retrying and something permanent
>> like
>>>>>>>>> forgot
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> create
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> topic? temporary here could be
>>>>>>>>>>>>>>>>>>>>>> the producer get deployed before the topic creation
>>>>>>>>> finish
>>>>>>>>>>>>>>>> (specially
>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>> the topic creation is handled via IaC)
>>>>>>>>>>>>>>>>>>>>>> temporary offline partitions
>>>>>>>>>>>>>>>>>>>>>> leadership changing
>>>>>>>>>>>>>>>>>>>>>>       Isn’t this putting the producer at risk of
>>>>>>>> dropping
>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>>>>>>> unintentionally?
>>>>>>>>>>>>>>>>>>>>>> - Can you elaborate more on what is written in the
>>>>>>>>>>>> compatibility
>>>>>>>>>>>>> /
>>>>>>>>>>>>>>>>>>>>>> migration plan section please by explaining in bit
>>>>>>> more
>>>>>>>>>>> details
>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> changing behaviour and how this will impact client
>>>>>>> who
>>>>>>>>> are
>>>>>>>>>>>>>>>> upgrading?
>>>>>>>>>>>>>>>>>>>>>> - In the proposal changes can you elaborate in the
>>>>>>> KIP
>>>>>>>>>> where
>>>>>>>>>>> in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> producer lifecycle will ClientExceptionHandler and
>>>>>>>>>>>>>>>>>>>>>> TransactionExceptionHandler get triggered, and how
>>>>>>> will
>>>>>>>>> the
>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>> configure them to point to costumed implementation.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>> Omnia
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
>>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to
>>>>>>>>>> Producer.
>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I look forward to your feedback!
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>> Alieh


Reply via email to