Hey folks,

Great discussion!

Re: throwing exceptions from send().  send() is documented to throw
KafkaException, so if the application doesn't handle it, it should be a
bug.  Now, it does have a note that API exceptions wouldn't be thrown, not
sure if we have code that relies on that.  There is a reason exceptions
have classes, they are designed to express a "class of errors" that can be
handled, so that we don't have to add a flag or a new method every time we
have a new exception to throw.  But if there is consensus that it's still
too risky (especially if we have examples of code that gets broken), then I
agree that we shouldn't do it.

Re: various ways to communicate semantics change.  If we must have 2
different behaviors, I think passing options to "ignore errors" to
commitTransaction is probably the most intuitive way to do it.  What I
really don't like (in any of the options), is that we cannot really
document it in a way that articulates a value in the product.  There are
tons of nuances that require understanding some buggy behavior, then fixed
behavior, then an option to sometimes turn on buggy behavior, and etc.

>  if a user invokes Producer::abortTransaction from within a producer
callback today

I think we would get invalid state exception.  Which we could probably fix,
but even if we supported it, I think it would be good if doing send +
commit would lead to aborted transaction without special action from the
application -- the simple things should be really simple, any failure
during send or commit should abort send + commit sequence without special
handling.

-Artem

On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton <fearthecel...@gmail.com>
wrote:

> One quick thought: if a user invokes Producer::abortTransaction from within
> a producer callback today, even in the midst of an ongoing call to
> Producer::commitTransaction, what is the behavior? Would it be reasonable
> to support this behavior as a way to allow error handling to take place
> during implicit flushes, via producer callback?
>
> On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > My point it, that it does not seem to be safe to allow users to ignore
> > errors with an implicit flush, and I think it's better to only allow it
> > with (ie, after) an explicit flush().
> >
> > My reasoning is, that users should make a decision to ignore errors or
> > not, before calling `commitTx()`, but after inspecting all potential
> > send errors. With an implicit flush, users need to "blindly" decide to
> > ignore send errors, because there are pending sends and potential errors
> > are not known yet, when calling `commitTx()`.
> >
> >
> >
> > > In the documentation of commitTransaction, we say if any send throws an
> > > error, commitTransaction will too.
> >
> > Yes. And I think we should keep it this way for an implicit flush. With
> > an explicit flush, `commitTransaction()` cannot encounter any send
> > errors any longer.
> >
> >
> >
> > > It says that all callbacks will be executed, but we ignore the errors
> of
> > > the callbacks.
> >
> > Ah. Thanks for pointing this out. For this case it's even worse (for
> > case (2)), because the user cannot inspect any errors and make any
> > decision to ignore or not during an implicit flush...
> >
> >
> >
> > > We shouldn't be relying on errors in the callback unless we are
> > > calling flush, which we can still do. It seems this has always been the
> > > case as well.
> >
> > Yes, has always been this way, and my point is to keep it this way
> > (option (2) would change it), and not start to allow to ignore errors
> > with an implicit flush.
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 6/24/24 4:57 PM, Justine Olshan wrote:
> > > Transaction verification is a concept from KIP-890 referring to the
> > > verification that a partition has been added to the transaction. It's
> > not a
> > > huge deal, but maybe we don't want to overload the terminology.
> > >
> > > For option 2, I was a little confused by this
> > >
> > >>    when commitTx is called, there is still pending Futures and not
> > > all Callbacks are executed yet -- with the implicit flush, we know that
> > > all Callbacks are executed, but even for this case, the user could only
> > > throw an exception inside the Callback to stop the TX to eventually
> > > commit -- Futures cannot be used to make a decision to ignore error and
> > > commit or not.
> > >
> > > In the documentation of commitTransaction, we say if any send throws an
> > > error, commitTransaction will too.
> > >
> > > *Further, if any of the {@link #send(ProducerRecord)} calls which were
> > part
> > > of the transaction hit irrecoverable errors, this method will throw the
> > > last received exception immediately and the transaction will not be
> > > committed.*
> > >
> > > It says that all callbacks will be executed, but we ignore the errors
> of
> > > the callbacks.
> > >
> > > *If the transaction is committed successfully and this method returns
> > > without throwing an exception, it is guaranteed that all {@link
> Callback
> > > callbacks} for records in the transaction will have been invoked and
> > > completed. Note that exceptions thrown by callbacks are ignored; the
> > > producer proceeds to commit the transaction in any case.*
> > >
> > > Is it fair to say though that for the send errors, we can choose to
> > ignore
> > > them? II wasn't understanding where the callbacks come in with your
> > > comment. We shouldn't be relying on errors in the callback unless we
> are
> > > calling flush, which we can still do. It seems this has always been the
> > > case as well.
> > >
> > > Justine
> > >
> > > On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield <
> > andrew_schofi...@live.com>
> > > wrote:
> > >
> > >> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy with
> 3a
> > as
> > >> the way.
> > >>
> > >> I suggest “TRANSACTION VERIFIED”.
> > >>
> > >> There isn’t really precedent for options in the producer API. We could
> > use
> > >> an enum,
> > >> which is easy to use and not very future-proof. Or we could use a
> class
> > >> like the
> > >> admin API does, which is cumbersome and flexible.
> > >>
> > >>    CommitTransactionOptions.TRANSACTION_VERIFIED
> > >>
> > >> or
> > >>
> > >>    public class CommitTransactionOptions {
> > >>      public CommitTransactionOptions();
> > >>
> > >>      CommitTransactionOptions transactionVerified(boolean
> > >> transactionVerified);
> > >>
> > >>      boolean transactionVerified();
> > >>    }
> > >>
> > >>
> > >> Then 3b is:
> > >>
> > >>     send(…)
> > >>     send(…)
> > >>     flush()
> > >>     commitTransaction(new
> > >> CommitTransactionOptions().transactionVerified(true))
> > >>
> > >>
> > >> I’d tend towards the enum here because I doubt we need as much
> > flexibility
> > >> as the admin API requires.
> > >>
> > >> Thanks,
> > >> Andrew
> > >>
> > >>
> > >>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org> wrote:
> > >>>
> > >>> I am ok either way (ie, flush or commit), but I think we need to
> define
> > >> exact semantics, and I think there is some subtle thing to consider:
> > >>>
> > >>>
> > >>>
> > >>> 1) flush(Options)
> > >>>
> > >>> Example:
> > >>>
> > >>>   send(...)
> > >>>   send(...)
> > >>>
> > >>>   flush(ignoreErrors)
> > >>>
> > >>>   // at this point, we know that all Futures are completed and all
> > >> Callbacks are executed, and we can assume that all user code checking
> > for
> > >> errors did execute, before `commitTx` is called
> > >>>
> > >>>   // I consider this option as safe
> > >>>
> > >>>   commitTx()
> > >>>
> > >>>
> > >>> 2) commitTx(Option)
> > >>>
> > >>> Example:
> > >>>
> > >>>   send(...)
> > >>>   send(...)
> > >>>
> > >>>   // when commitTx is called, there is still pending Futures and not
> > all
> > >> Callbacks are executed yet -- with the implicit flush, we know that
> all
> > >> Callbacks are executed, but even for this case, the user could only
> > throw
> > >> an exception inside the Callback to stop the TX to eventually commit
> --
> > >> Futures cannot be used to make a decision to ignore error and commit
> or
> > not.
> > >>>
> > >>>   // I consider this option not as safe
> > >>>
> > >>>   commitTx(igrnoreErrors)
> > >>>
> > >>>
> > >>>
> > >>> 3a) required flush + commitTx(Option)
> > >>>
> > >>> Example:
> > >>>
> > >>>   send(...)
> > >>>   send(...)
> > >>>
> > >>>   flush()
> > >>>
> > >>>   // at this point, we know that all Future are completed and all
> > >> Callbacks are executed, and we can assume that all user code checking
> > for
> > >> error did execute, before `commitTx` is called
> > >>>
> > >>>   // I consider this option as safe
> > >>>
> > >>>   commitTx(ignoreErrors)
> > >>>
> > >>>
> > >>> 3b) missing flush + commitTx(Option)
> > >>>
> > >>> Example:
> > >>>
> > >>>   send(...)
> > >>>   send(...)
> > >>>
> > >>>   // as flush() was not called explicitly, we should ignore
> > >> `ignoreErrors` flag and always throw an exception if the producer is
> in
> > >> error state, because we cannot be sure that the user did all required
> > check
> > >> for error handling
> > >>>
> > >>>   commitTx(ignoreErrors)
> > >>>
> > >>>
> > >>>
> > >>> The only issue with option (3) is, that it's more complex and
> semantics
> > >> are more subtle. But it might be the a good (necessary?) bridge
> between
> > (1)
> > >> and (2): (3) is semantically sound (we ignore errors via passing a
> flag
> > >> into commitTx() instead of flush()), and at the same time safe (we
> force
> > >> users to explicitly flush() and [hopefully] do proper error handling,
> > and
> > >> don't rely in am implicit flush() during commitTx() which might be
> error
> > >> prone).
> > >>>
> > >>> (Also need to find a good and descriptive name for the flag we pass
> > into
> > >> `commitTx()` for this case.)
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>>
> > >>> On 6/24/24 8:51 AM, Andrew Schofield wrote:
> > >>>> Hi Chris,
> > >>>> That works for me too. I slightly prefer an option on flush(), but
> > what
> > >> you suggested
> > >>>> works too.
> > >>>> Thanks,
> > >>>> Andrew
> > >>>>> On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID>
> > >> wrote:
> > >>>>>
> > >>>>> Hi Andrew,
> > >>>>>
> > >>>>> I like a lot of what you said, but I still believe it's better to
> > >> override
> > >>>>> commitTransaction than flush. Users will already have to manually
> opt
> > >> in to
> > >>>>> ignoring errors encountered during transactions, and we can
> document
> > >>>>> recommended usage (i.e., explicitly invoking flush() before
> invoking
> > >>>>> commitTransaction(ignoreRecordErrors)) in the newly-introduced
> > method.
> > >> I
> > >>>>> don't believe it's worth the increased cognitive load on users with
> > >>>>> non-transactional producers to introduce an overloaded flush()
> > variant.
> > >>>>>
> > >>>>> Cheers,
> > >>>>>
> > >>>>> Chris
> > >>>>>
> > >>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <
> > >> andrew_schofi...@live.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Alieh,
> > >>>>>> Thanks for driving this. Unfortunately, there are many parts of
> the
> > >> API
> > >>>>>> which
> > >>>>>> are a bit unfortunate and it’s tricky to make small improvements
> > that
> > >>>>>> don’t have
> > >>>>>> downsides.
> > >>>>>>
> > >>>>>> I don’t like the idea of using a configuration because
> configuration
> > >> is
> > >>>>>> often
> > >>>>>> outside the application and changing the behaviour of someone
> else’s
> > >>>>>> application
> > >>>>>> without understanding it is risky. Anything which embeds a
> > >> transactional
> > >>>>>> producer
> > >>>>>> could have its behaviour changed unexpectedly.
> > >>>>>>
> > >>>>>> It would be been much nicer if send() didn’t fail silently and
> > change
> > >> the
> > >>>>>> transaction
> > >>>>>> state. But, because it’s an asynchronous operation, I don’t really
> > >> think
> > >>>>>> we can
> > >>>>>> just make it throw all exceptions, even though I really think that
> > >>>>>> `send()` is the
> > >>>>>> method with the problem here.
> > >>>>>>
> > >>>>>> The contract of `flush()` is that it makes sure that all preceding
> > >> sends
> > >>>>>> will have
> > >>>>>> completed, so it should be true that a well written application
> > would
> > >> be
> > >>>>>> able to
> > >>>>>> know which records were OK because of the Future<RecordMetadata>
> > >> returned
> > >>>>>> by the `send()` method. It should be able to determine whether it
> > >> wants to
> > >>>>>> commit
> > >>>>>> the transaction even if some of the intended operations didn’t
> > >> succeed.
> > >>>>>>
> > >>>>>> What we don’t currently have is a way for the application to say
> to
> > >> the
> > >>>>>> KafkaProducer
> > >>>>>> that it knows the outcome of sending the records and to confirm
> that
> > >> it
> > >>>>>> wants to proceed.
> > >>>>>> Then it would not be necessary for `commitTransaction()` to throw
> an
> > >>>>>> exception to
> > >>>>>> report a historical error which the application might choose to
> > >> ignore.
> > >>>>>>
> > >>>>>> Having read the comments, I think the KIP is on the right lines
> > >> focusing
> > >>>>>> on the `flush()`
> > >>>>>> method. My suggestion is that we introduce an option on `flush()`
> to
> > >> be
> > >>>>>> used before
> > >>>>>> `commitTransaction()` for applications that want to be able to
> > commit
> > >>>>>> transactions which
> > >>>>>> had known failed operations.
> > >>>>>>
> > >>>>>> The code would be:
> > >>>>>>
> > >>>>>>    producer.beginTransaction();
> > >>>>>>
> > >>>>>>    future1 = producer.send(goodRecord1);
> > >>>>>>    future2 = producer.send(badRecord); // The future from this
> call
> > >> will
> > >>>>>> complete exceptionally
> > >>>>>>    future3 = producer.send(goodRecord2);
> > >>>>>>
> > >>>>>>    producer.flush(FlushOption.TRANSACTION_READY);
> > >>>>>>
> > >>>>>>    // At this point, we know that all 3 futures are complete and
> the
> > >>>>>> transaction contains 2 records
> > >>>>>>    producer.commitTransaction();
> > >>>>>>
> > >>>>>> I wouldn’t deprecate `flush()` with no option. It just uses the
> > >> default
> > >>>>>> option which behaves
> > >>>>>> like today.
> > >>>>>>
> > >>>>>> Why did I suggest an option on `flush()` rather than
> > >>>>>> `commitTransaction()`? Because with
> > >>>>>> `flush()`, it’s clear when the application is stating that it’s
> seen
> > >> all
> > >>>>>> of the results from its
> > >>>>>> `send()` calls and it’s ready to proceed. If it has to rely on
> > >> flushing
> > >>>>>> that occurs inside
> > >>>>>> `commitTransaction()`, I don’t see it’s as clear-cut.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Andrew
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi
> > <asae...@confluent.io.INVALID
> > >>>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi all,
> > >>>>>>> Thanks for the interesting discussion.
> > >>>>>>>
> > >>>>>>> I assume that now the main questions are as follows:
> > >>>>>>>
> > >>>>>>> 1. Do we need to transit the transcation to the error state for
> API
> > >>>>>>> exceptions?
> > >>>>>>> 2. Should we throw the API exception in `send()` instead of
> > >> returning a
> > >>>>>>> future error?
> > >>>>>>> 3. If the answer to question (1) is NO and to question (2) is
> YES,
> > >> do we
> > >>>>>>> need to change the current `flush` or `commitTnx` at all?
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Alieh
> > >>>>>>>
> > >>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <
> mj...@apache.org>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hey Kirk,
> > >>>>>>>>
> > >>>>>>>> can you elaborate on a few points?
> > >>>>>>>>
> > >>>>>>>>> Otherwise users would have to know to explicitly change their
> > code
> > >> to
> > >>>>>>>> invoke flush().
> > >>>>>>>>
> > >>>>>>>> Why? If we would add an option to `flush(FlushOption)`, the
> > existing
> > >>>>>>>> `flush()` w/o any option will still be there, right? If we would
> > >> really
> > >>>>>>>> deprecate existing `flush()`, it would just mean that we would
> > pass
> > >>>>>>>> "default FlushOption" into an implicit flush (and yes, we would
> > >> need to
> > >>>>>>>> define what this would be).
> > >>>>>>>>
> > >>>>>>>> I think there is no clear winner (as pointed out in my last
> > reply),
> > >> and
> > >>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has
> > >> advantages
> > >>>>>>>> and drawbacks. Guess we need to just agree on which tradeoff we
> > >> want to
> > >>>>>>>> move forward with?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Not sure if your database example is a 1:1 fit? I think, the
> > better
> > >>>>>>>> comparison would be:
> > >>>>>>>>
> > >>>>>>>> BEGIN TX;
> > >>>>>>>> INSERT INTO foo VALUES (’a’);
> > >>>>>>>> INSERT INTO foo VALUES (’b’);
> > >>>>>>>> INSERT INTO foo VALUES (’c’);
> > >>>>>>>> INSERT INTO foo VALUES (’not sure’);
> > >>>>>>>>
> > >>>>>>>> For this case, the full TX would roll back, right? I still think
> > >> that
> > >>>>>>>> allowing users to just skip over the last error, and continue
> the
> > TX
> > >>>>>>>> would be ok. In the end, we provide a programmatic API, and not
> a
> > >>>>>>>> declarative one as SQL. Of course, default behavior would still
> be
> > >> to
> > >>>>>>>> put the producer into error state, and the user would need to
> call
> > >>>>>>>> `abortTransaction()` to move forward.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote:
> > >>>>>>>>> Hi Matthias,
> > >>>>>>>>>
> > >>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <
> mj...@apache.org
> > >
> > >>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> If we want to limit it to `RecordTooLargeException` throwing
> > from
> > >>>>>>>> `send()` directly make sense. Thanks for calling it out.
> > >>>>>>>>>>
> > >>>>>>>>>> It's still a question of backward compatibility? `send()` does
> > >> throw
> > >>>>>>>> exceptions already, including generic `KafkaException`. Not sure
> > if
> > >> this
> > >>>>>>>> helps with backward compatibility? Could we just add a new
> > exception
> > >>>>>> type
> > >>>>>>>> (which is a child of `KafkaException`)?
> > >>>>>>>>>>
> > >>>>>>>>>> The Producer JavaDocs are not totally explicit about it IMHO.
> > >>>>>>>>>>
> > >>>>>>>>>> I think we could expect that some generic error handling path
> > gets
> > >>>>>>>> executed. For the TX-case, I would assume that a TX would be
> > >> aborted if
> > >>>>>>>> `send()` throws or that the producer would be `closed()`.
> Overall
> > >> this
> > >>>>>>>> might be safe?
> > >>>>>>>>>>
> > >>>>>>>>>>> It would be a little less flexible
> > >>>>>>>>>>>> though, since (as you note) it would still be impossible to
> > >> commit
> > >>>>>>>>>>>> transactions after errors have been reported from brokers.
> > >>>>>>>>>>
> > >>>>>>>>>> KS would still need a way to clear the error state of the
> > >> producer. We
> > >>>>>>>> could catch a `RecordTooLargeException` from `send()`, call the
> > >> handler
> > >>>>>> and
> > >>>>>>>> let it decide what to do next. But if it does return `CONTINUE`
> to
> > >>>>>> swallow
> > >>>>>>>> the error and drop the poison pill record on the floor, we would
> > >> want to
> > >>>>>>>> move forward and commit the transaction.
> > >>>>>>>>>>
> > >>>>>>>>>> But the question is: if we cannot add a record to the tx, does
> > the
> > >>>>>>>> producer need to go into error state? In the end, we did throw
> and
> > >>>>>> inform
> > >>>>>>>> the app that the record was _not_ added, and it's up to the app
> to
> > >>>>>> decide
> > >>>>>>>> what to do next?
> > >>>>>>>>>
> > >>>>>>>>> That’s an excellent question…
> > >>>>>>>>>
> > >>>>>>>>> Imagine the user’s application is writing information to a
> > database
> > >>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column and
> > this
> > >> SQL
> > >>>>>>>> statement was attempted, what should happen?
> > >>>>>>>>>
> > >>>>>>>>>     INSERT INTO foo VALUES (’not sure’);
> > >>>>>>>>>
> > >>>>>>>>> Yes, that DML would fail, sure, but would the user expect that
> > the
> > >>>>>>>> connection used by database library would get stuck in some kind
> > of
> > >>>>>> error
> > >>>>>>>> state? A user would be able catch the error and either continue
> or
> > >>>>>> abort,
> > >>>>>>>> based on their business rules.
> > >>>>>>>>>
> > >>>>>>>>> So I agree with what I believe you’re implying: we shouldn’t
> > >> poison the
> > >>>>>>>> Producer/TransactionManager on certain types of
> application-level
> > >>>>>> errors in
> > >>>>>>>> send().
> > >>>>>>>>>
> > >>>>>>>>> Kirk
> > >>>>>>>>>
> > >>>>>>>>>> If we report the error only via the `Callback` it's a
> different
> > >> story,
> > >>>>>>>> because the contract for this case is clearly specified on the
> > >> JavaDocs:
> > >>>>>>>>>>
> > >>>>>>>>>>> When used as part of a transaction, it is not necessary to
> > >> define a
> > >>>>>>>> callback or check the result of the future
> > >>>>>>>>>>> in order to detect errors from <code>send</code>. If any of
> the
> > >> send
> > >>>>>>>> calls failed with an irrecoverable error,
> > >>>>>>>>>>> the final {@link #commitTransaction()} call will fail and
> throw
> > >> the
> > >>>>>>>> exception from the last failed send. When
> > >>>>>>>>>>> this happens, your application should call {@link
> > >>>>>> #abortTransaction()}
> > >>>>>>>> to reset the state and continue to send
> > >>>>>>>>>>> data.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> -Matthias
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote:
> > >>>>>>>>>>> Hi Artem,
> > >>>>>>>>>>> I think it'd make sense to throw directly from send whenever
> > >>>>>> possible,
> > >>>>>>>>>>> instead of returning an already-completed future. I didn't do
> > >> that in
> > >>>>>>>> my
> > >>>>>>>>>>> bug fix to try to be conservative about breaking changes but
> > this
> > >>>>>>>> seems to
> > >>>>>>>>>>> have caused its own set of headaches. It would be a little
> less
> > >>>>>>>> flexible
> > >>>>>>>>>>> though, since (as you note) it would still be impossible to
> > >> commit
> > >>>>>>>>>>> transactions after errors have been reported from brokers.
> > >>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if that
> > >>>>>>>> flexibility
> > >>>>>>>>>>> is required. If it is, then users could explicitly call
> flush()
> > >>>>>> before
> > >>>>>>>>>>> committing (and ignoring errors for) or aborting a
> transaction,
> > >> if
> > >>>>>> they
> > >>>>>>>>>>> want to implement fine-grained error handling logic such as
> > >> allowing
> > >>>>>>>> errors
> > >>>>>>>>>>> for a subset of topics to be ignored.
> > >>>>>>>>>>> Hi Matthias,
> > >>>>>>>>>>> Most of the time you're right and we can't throw from send();
> > >>>>>> however,
> > >>>>>>>> in
> > >>>>>>>>>>> this case (client-side record-too-large exception), the error
> > is
> > >>>>>>>> actually
> > >>>>>>>>>>> noticed by the producer before send() returns, so it should
> be
> > >>>>>>>> possible to
> > >>>>>>>>>>> throw directly.
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>> Chris
> > >>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <
> mj...@apache.org>
> > >>>>>> wrote:
> > >>>>>>>>>>>> Not sure if we can change send and make it throw, given that
> > >> send()
> > >>>>>> is
> > >>>>>>>>>>>> async? That is why users can register a `Callback` to begin
> > >> with,
> > >>>>>>>> right?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> And Alieh's point about backward compatibility is also a
> fair
> > >>>>>> concern.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Actually, this would potentially be even
> > >>>>>>>>>>>>> worse than the original buggy behavior because the bug was
> > >> that we
> > >>>>>>>>>>>> ignored
> > >>>>>>>>>>>>> errors that happened in the "send()" method itself, not
> > >> necessarily
> > >>>>>>>> the
> > >>>>>>>>>>>>> ones that we got from the broker.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> My understanding was that `commitTx(swallowError)` would
> only
> > >>>>>> swallow
> > >>>>>>>>>>>> `send()` errors, not errors about the actually commit. I
> agree
> > >> that
> > >>>>>> it
> > >>>>>>>>>>>> would be very bad to swallow errors about the actual tx
> > >> commit...
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> It's a fair question if this might be too subtle; to make it
> > >>>>>> explicit,
> > >>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()`
> [working
> > >> name]
> > >>>>>> to
> > >>>>>>>>>>>> make it clear.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> If we think it's too subtle to change commit to swallow
> send()
> > >>>>>> errors,
> > >>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer (and
> > we
> > >> can
> > >>>>>>>> use
> > >>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working
> > name]
> > >> to
> > >>>>>> be
> > >>>>>>>>>>>> explicitly that the `FlushOption` for now has only an effect
> > for
> > >>>>>> TX).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thoughts?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
> > >>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> It is very exciting to see all the experts here raising
> very
> > >> good
> > >>>>>>>> points.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> As we go further, we see more and more options to improve
> our
> > >>>>>>>> solution,
> > >>>>>>>>>>>>> which makes concluding and updating the KIP impossible.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The main suggestions so far are:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 3. `send` must throw the exception
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> My concern about the 3rd suggestion:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 1. Does the change cause any issue with backward
> > compatibility?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 2. The `send (bad record)` already transits the transaction
> > to
> > >> the
> > >>>>>>>> error
> > >>>>>>>>>>>>> state. No user, including Streams is able to transit the
> > >>>>>> transaction
> > >>>>>>>> back
> > >>>>>>>>>>>>> from the error state. Do you mean we remove the
> > >>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here
> > >>>>>>>>>>>>> <
> > >>>>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>
> >
> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> as well?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
> > >>>>>>>>>>>> andrew_schofi...@live.com>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Artem,
> > >>>>>>>>>>>>>> I think you make a good point which is worth further
> > >>>>>> consideration.
> > >>>>>>>> If
> > >>>>>>>>>>>>>> any of the existing methods is really ripe for a change
> > here,
> > >> it’s
> > >>>>>>>> the
> > >>>>>>>>>>>>>> send() that actually caused the problem. If that can be
> > fixed
> > >> so
> > >>>>>>>> there
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>> no situations in which a lurking error breaks a
> transaction,
> > >> that
> > >>>>>>>> might
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>> the best.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>> Andrew
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <
> > >> alivsh...@confluent.io
> > >>>>>>>>>>>> .INVALID>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I thought we still wait for requests (and their errors)
> to
> > >> come
> > >>>>>>>> in and
> > >>>>>>>>>>>>>>> could handle fatal errors appropriately.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> We do wait for requests, but my understanding is that
> when
> > >>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to ignore
> > >> errors.
> > >>>>>>>> So
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1. send
> > >>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors")
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> the commit will succeed.  You can look at the example in
> > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and
> just
> > >>>>>>>> substitute
> > >>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send
> > >> errors")
> > >>>>>> and
> > >>>>>>>> we
> > >>>>>>>>>>>> get
> > >>>>>>>>>>>>>>> the buggy behavior back :-).  Actually, this would
> > >> potentially be
> > >>>>>>>> even
> > >>>>>>>>>>>>>>> worse than the original buggy behavior because the bug
> was
> > >> that
> > >>>>>> we
> > >>>>>>>>>>>>>> ignored
> > >>>>>>>>>>>>>>> errors that happened in the "send()" method itself, not
> > >>>>>>>> necessarily the
> > >>>>>>>>>>>>>>> ones that we got from the broker.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Actually, looking at
> > >>>>>>>> https://github.com/apache/kafka/pull/11508/files,
> > >>>>>>>>>>>>>>> wouldn't a better solution be to just throw the error
> from
> > >> the
> > >>>>>>>> "send"
> > >>>>>>>>>>>>>>> method itself, rather than trying to set it to be thrown
> > >> during
> > >>>>>>>> commit?
> > >>>>>>>>>>>>>>> This way the example in
> > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> > >>>>>>>>>>>>>>> would be fixed, and at the same time it would give an
> > >> opportunity
> > >>>>>>>> for
> > >>>>>>>>>>>> KS
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> catch the error and ignore it if needed.  Not sure if we
> > >> need a
> > >>>>>>>> KIP for
> > >>>>>>>>>>>>>>> that, just do a better fix of the old bug.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> -Artem
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
> > >>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here
> looks
> > >>>>>>>> reasonable.
> > >>>>>>>>>>>>>>>> Moving the logic to a transactional method makes sense
> to
> > >> me and
> > >>>>>>>> makes
> > >>>>>>>>>>>>>> me
> > >>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in the
> > >> methods
> > >>>>>>>> relevant
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> the issue.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send
> > >>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> without
> > >>>>>> explicit
> > >>>>>>>>>>>>>> flush,
> > >>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> application
> > >> won't
> > >>>>>> be
> > >>>>>>>>>>>> able
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Is this with respect to the case a request is still
> > inflight
> > >>>>>> when
> > >>>>>>>> we
> > >>>>>>>>>>>>>> call
> > >>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for requests
> > (and
> > >>>>>> their
> > >>>>>>>>>>>>>> errors)
> > >>>>>>>>>>>>>>>> to come in and could handle fatal errors appropriately.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
> > >>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas),
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar
> > could
> > >> be a
> > >>>>>>>> good
> > >>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>> forward?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I like this approach.  One minor concern is that if we
> > set
> > >>>>>>>> "ignore
> > >>>>>>>>>>>> send
> > >>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> without
> > >>>>>>>> explicit
> > >>>>>>>>>>>>>> flush,
> > >>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> application
> > >> won't
> > >>>>>>>> be
> > >>>>>>>>>>>> able
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
> But
> > I
> > >>>>>> guess
> > >>>>>>>>>>>> we'll
> > >>>>>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>> have to clearly document it.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> In some way we are basically adding a flag to
> optionally
> > >>>>>> restore
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug,
> > >> which is
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-).
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> -Artem
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
> > >>>>>>>> mj...@apache.org>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a lot of
> > >>>>>> support.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> So we need to go with some form or "overload / new
> > >> method". I
> > >>>>>>>> think
> > >>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but
> > rather
> > >>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; for
> > >> non-tx
> > >>>>>>>> case,
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> different flush variants would not make sense.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options"
> > >> object, so
> > >>>>>>>> maybe
> > >>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could
> be a
> > >> good
> > >>>>>>>> way
> > >>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean` parameter,
> > >>>>>>>> aesthetically,
> > >>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>> we as extendable in the future if necessary.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Given that we would pass in an optional parameter, we
> > >> might
> > >>>>>> not
> > >>>>>>>> even
> > >>>>>>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()`
> > >> method?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
> > >>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which changes
> the
> > >>>>>>>> behaviour
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> flush() method. We already have too many configs.
> But I
> > >>>>>> totally
> > >>>>>>>>>>>>>>>>>> understand
> > >>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some of
> the
> > >> other
> > >>>>>>>>>>>>>>>>> suggestions
> > >>>>>>>>>>>>>>>>>>> in this thread seem neater.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Personally, I would add another method to
> > KafkaProducer.
> > >> Not
> > >>>>>> an
> > >>>>>>>>>>>>>>>>> overload
> > >>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all. Using
> > >>>>>> Matthias’s
> > >>>>>>>>>>>>>>>>> options,
> > >>>>>>>>>>>>>>>>>>> I prefer (3).
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>> Andrew
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <
> > liane...@gmail.com
> > >>>
> > >>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh!
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the
> config
> > >> not
> > >>>>>>>> being
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> most
> > >>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this capability.
> > >> Getting
> > >>>>>>>> then to
> > >>>>>>>>>>>>>>>>>> Matthias
> > >>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that
> it
> > >> seems
> > >>>>>>>> they
> > >>>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some
> > other
> > >>>>>> twist
> > >>>>>>>> to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> flush
> > >>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another param
> > to
> > >> it,
> > >>>>>> or
> > >>>>>>>>>>>>>>>> another
> > >>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the context to
> > >> answer
> > >>>>>>>> that,
> > >>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>> if it
> > >>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some kind
> > >>>>>>>> FlushOptions
> > >>>>>>>>>>>>>>>>>> params to
> > >>>>>>>>>>>>>>>>>>>> the flush would be better from an extensibility
> point
> > of
> > >>>>>>>> view. It
> > >>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for now
> but
> > >> could
> > >>>>>>>>>>>> accept
> > >>>>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>> other we may need. I find that this would remove the
> > >>>>>>>> "ugliness"
> > >>>>>>>>>>>>>>>>> Matthias
> > >>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the
> different
> > >>>>>>>> semantics
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>> flush,
> > >>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and
> > >>>>>>>>>>>> commitTransaction
> > >>>>>>>>>>>>>>>>>> java
> > >>>>>>>>>>>>>>>>>>>> docs. It currently states that  flush "clears the
> last
> > >>>>>>>> exception"
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after
> > >> flush,
> > >>>>>> but
> > >>>>>>>> it
> > >>>>>>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>>> depends on the config/options/method used.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an
> example
> > to
> > >>>>>> show
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>> flow
> > >>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great gain
> > >> here):
> > >>>>>>>> flush
> > >>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>> clear
> > >>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever error
> > >> handling
> > >>>>>>>> we
> > >>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>> ->
> > >>>>>>>>>>>>>>>>>>>> commitTransaction successfully
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks!
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Lianet
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
> > >>>>>>>>>>>>>>>>> fearthecel...@gmail.com
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more
> that
> > >> might
> > >>>>>>>> help
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> if,
> > >>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded
> > >>>>>>>> commitTransaction()
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean
> > >>>>>>>> tolerateRecordErrors).
> > >>>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral
> > >> change we
> > >>>>>>>> want,
> > >>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API
> method
> > >> that
> > >>>>>> is
> > >>>>>>>> only
> > >>>>>>>>>>>>>>>>> used
> > >>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid the
> > issue
> > >> of
> > >>>>>>>> whether
> > >>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered
> > semantics)
> > >>>>>>>> should
> > >>>>>>>>>>>>>>>> throw
> > >>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>> not. Thoughts?
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot
> more
> > >> than
> > >>>>>> the
> > >>>>>>>>>>>>>>>>> pluggable
> > >>>>>>>>>>>>>>>>>>>>> handler!
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this
> behavior
> > >> via
> > >>>>>>>>>>>>>>>>> configuration
> > >>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that
> > >> application
> > >>>>>>>> code
> > >>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>> written in a style that only works with one type of
> > >>>>>> behavior
> > >>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that
> > application
> > >> code
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>>> declare
> > >>>>>>>>>>>>>>>>>> its
> > >>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer seems
> > >> more
> > >>>>>>>>>>>>>>>> appropriate
> > >>>>>>>>>>>>>>>>>> than,
> > >>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application to
> > >> tweak a
> > >>>>>>>>>>>>>>>>>> configuration
> > >>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside it.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax <
> > >>>>>>>>>>>> mj...@apache.org
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP
> > >> as-is,
> > >>>>>> but
> > >>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points...
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move forward?
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>   1. add config to allow "silent error clearance"
> as
> > >> the
> > >>>>>>>> KIP
> > >>>>>>>>>>>>>>>>> proposes
> > >>>>>>>>>>>>>>>>>>>>>>   2. change flush() to clear error and let it
> throw
> > >>>>>>>>>>>>>>>>>>>>>>   3. add new flushAndThrow()` (or better name)
> which
> > >>>>>> clears
> > >>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> throws
> > >>>>>>>>>>>>>>>>>>>>>>   4. add `flush(boolean clearAndThrow)` and let
> user
> > >> pick
> > >>>>>>>> (and
> > >>>>>>>>>>>>>>>>>> deprecate
> > >>>>>>>>>>>>>>>>>>>>>> existing `flush()`)
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior change,
> > we
> > >>>>>> might
> > >>>>>>>> also
> > >>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>> public "feature flag" config.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem
> > >> mentioned.
> > >>>>>>>> (3)
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> (4)
> > >>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we
> > >> kinda get
> > >>>>>>>> an
> > >>>>>>>>>>>> ugly
> > >>>>>>>>>>>>>>>>>> API?
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems
> > we
> > >> need
> > >>>>>>>> to
> > >>>>>>>>>>>> pick
> > >>>>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution?
> Would
> > >> be
> > >>>>>>>> good to
> > >>>>>>>>>>>>>>>> her
> > >>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>> others what they think
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP.  I have a couple of
> > >> suggestions:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> AL1.  We should throw an error from flush after
> we
> > >> clear
> > >>>>>>>> it.
> > >>>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send +
> > >> flush +
> > >>>>>>>>>>>> commit"
> > >>>>>>>>>>>>>>>>> (the
> > >>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to
> > express
> > >> the
> > >>>>>>>>>>>> former,
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would
> > >> throw if
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written either
> way
> > >> it's
> > >>>>>>>> going
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>> correct).
> > >>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended by
> > the
> > >>>>>>>> caller to
> > >>>>>>>>>>>>>>>>>>>>> intercept
> > >>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and
> commit
> > >> the
> > >>>>>>>>>>>>>>>>> transaction.
> > >>>>>>>>>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if
> someone
> > >> has
> > >>>>>>>> code
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> doesn't
> > >>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic
> "send +
> > >>>>>> flush +
> > >>>>>>>>>>>>>>>> commit"
> > >>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things possible,
> > an
> > >>>>>>>>>>>> application
> > >>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>> add
> > >>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> AL2.  I'm not sure if config is the best way to
> > >> express
> > >>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> modification
> > >>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic
> that
> > >> calls
> > >>>>>>>>>>>> "flush"
> > >>>>>>>>>>>>>>>>>> needs
> > >>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring
> > >> semantics in
> > >>>>>> a
> > >>>>>>>>>>>>>>>> detached
> > >>>>>>>>>>>>>>>>>>>>> place
> > >>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies.
> This
> > >> can
> > >>>>>> be
> > >>>>>>>>>>>>>>>>> especially
> > >>>>>>>>>>>>>>>>>>>>> bad
> > >>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file
> at
> > >> run
> > >>>>>>>> time, in
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> case a
> > >>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the
> > application
> > >>>>>>>> because it
> > >>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>>>>> written
> > >>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics
> > is
> > >>>>>>>> switched.
> > >>>>>>>>>>>>>>>>> Given
> > >>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's
> > >>>>>>>> expectation,
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's
> > >> expectation
> > >>>>>>>> to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> "flush"
> > >>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a different
> name
> > or
> > >>>>>> have
> > >>>>>>>> an
> > >>>>>>>>>>>>>>>>>> overload
> > >>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics
> > (the
> > >>>>>>>> current
> > >>>>>>>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one).
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> -Artem
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059
> > that
> > >>>>>>>> suggests
> > >>>>>>>>>>>>>>>>> adding
> > >>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>
> > >>
> > >>
> > >
> >
>

Reply via email to