Hey folks, Great discussion!
Re: throwing exceptions from send(). send() is documented to throw KafkaException, so if the application doesn't handle it, it should be a bug. Now, it does have a note that API exceptions wouldn't be thrown, not sure if we have code that relies on that. There is a reason exceptions have classes, they are designed to express a "class of errors" that can be handled, so that we don't have to add a flag or a new method every time we have a new exception to throw. But if there is consensus that it's still too risky (especially if we have examples of code that gets broken), then I agree that we shouldn't do it. Re: various ways to communicate semantics change. If we must have 2 different behaviors, I think passing options to "ignore errors" to commitTransaction is probably the most intuitive way to do it. What I really don't like (in any of the options), is that we cannot really document it in a way that articulates a value in the product. There are tons of nuances that require understanding some buggy behavior, then fixed behavior, then an option to sometimes turn on buggy behavior, and etc. > if a user invokes Producer::abortTransaction from within a producer callback today I think we would get invalid state exception. Which we could probably fix, but even if we supported it, I think it would be good if doing send + commit would lead to aborted transaction without special action from the application -- the simple things should be really simple, any failure during send or commit should abort send + commit sequence without special handling. -Artem On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton <fearthecel...@gmail.com> wrote: > One quick thought: if a user invokes Producer::abortTransaction from within > a producer callback today, even in the midst of an ongoing call to > Producer::commitTransaction, what is the behavior? Would it be reasonable > to support this behavior as a way to allow error handling to take place > during implicit flushes, via producer callback? > > On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org> wrote: > > > My point it, that it does not seem to be safe to allow users to ignore > > errors with an implicit flush, and I think it's better to only allow it > > with (ie, after) an explicit flush(). > > > > My reasoning is, that users should make a decision to ignore errors or > > not, before calling `commitTx()`, but after inspecting all potential > > send errors. With an implicit flush, users need to "blindly" decide to > > ignore send errors, because there are pending sends and potential errors > > are not known yet, when calling `commitTx()`. > > > > > > > > > In the documentation of commitTransaction, we say if any send throws an > > > error, commitTransaction will too. > > > > Yes. And I think we should keep it this way for an implicit flush. With > > an explicit flush, `commitTransaction()` cannot encounter any send > > errors any longer. > > > > > > > > > It says that all callbacks will be executed, but we ignore the errors > of > > > the callbacks. > > > > Ah. Thanks for pointing this out. For this case it's even worse (for > > case (2)), because the user cannot inspect any errors and make any > > decision to ignore or not during an implicit flush... > > > > > > > > > We shouldn't be relying on errors in the callback unless we are > > > calling flush, which we can still do. It seems this has always been the > > > case as well. > > > > Yes, has always been this way, and my point is to keep it this way > > (option (2) would change it), and not start to allow to ignore errors > > with an implicit flush. > > > > > > > > -Matthias > > > > > > > > On 6/24/24 4:57 PM, Justine Olshan wrote: > > > Transaction verification is a concept from KIP-890 referring to the > > > verification that a partition has been added to the transaction. It's > > not a > > > huge deal, but maybe we don't want to overload the terminology. > > > > > > For option 2, I was a little confused by this > > > > > >> when commitTx is called, there is still pending Futures and not > > > all Callbacks are executed yet -- with the implicit flush, we know that > > > all Callbacks are executed, but even for this case, the user could only > > > throw an exception inside the Callback to stop the TX to eventually > > > commit -- Futures cannot be used to make a decision to ignore error and > > > commit or not. > > > > > > In the documentation of commitTransaction, we say if any send throws an > > > error, commitTransaction will too. > > > > > > *Further, if any of the {@link #send(ProducerRecord)} calls which were > > part > > > of the transaction hit irrecoverable errors, this method will throw the > > > last received exception immediately and the transaction will not be > > > committed.* > > > > > > It says that all callbacks will be executed, but we ignore the errors > of > > > the callbacks. > > > > > > *If the transaction is committed successfully and this method returns > > > without throwing an exception, it is guaranteed that all {@link > Callback > > > callbacks} for records in the transaction will have been invoked and > > > completed. Note that exceptions thrown by callbacks are ignored; the > > > producer proceeds to commit the transaction in any case.* > > > > > > Is it fair to say though that for the send errors, we can choose to > > ignore > > > them? II wasn't understanding where the callbacks come in with your > > > comment. We shouldn't be relying on errors in the callback unless we > are > > > calling flush, which we can still do. It seems this has always been the > > > case as well. > > > > > > Justine > > > > > > On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield < > > andrew_schofi...@live.com> > > > wrote: > > > > > >> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy with > 3a > > as > > >> the way. > > >> > > >> I suggest “TRANSACTION VERIFIED”. > > >> > > >> There isn’t really precedent for options in the producer API. We could > > use > > >> an enum, > > >> which is easy to use and not very future-proof. Or we could use a > class > > >> like the > > >> admin API does, which is cumbersome and flexible. > > >> > > >> CommitTransactionOptions.TRANSACTION_VERIFIED > > >> > > >> or > > >> > > >> public class CommitTransactionOptions { > > >> public CommitTransactionOptions(); > > >> > > >> CommitTransactionOptions transactionVerified(boolean > > >> transactionVerified); > > >> > > >> boolean transactionVerified(); > > >> } > > >> > > >> > > >> Then 3b is: > > >> > > >> send(…) > > >> send(…) > > >> flush() > > >> commitTransaction(new > > >> CommitTransactionOptions().transactionVerified(true)) > > >> > > >> > > >> I’d tend towards the enum here because I doubt we need as much > > flexibility > > >> as the admin API requires. > > >> > > >> Thanks, > > >> Andrew > > >> > > >> > > >>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org> wrote: > > >>> > > >>> I am ok either way (ie, flush or commit), but I think we need to > define > > >> exact semantics, and I think there is some subtle thing to consider: > > >>> > > >>> > > >>> > > >>> 1) flush(Options) > > >>> > > >>> Example: > > >>> > > >>> send(...) > > >>> send(...) > > >>> > > >>> flush(ignoreErrors) > > >>> > > >>> // at this point, we know that all Futures are completed and all > > >> Callbacks are executed, and we can assume that all user code checking > > for > > >> errors did execute, before `commitTx` is called > > >>> > > >>> // I consider this option as safe > > >>> > > >>> commitTx() > > >>> > > >>> > > >>> 2) commitTx(Option) > > >>> > > >>> Example: > > >>> > > >>> send(...) > > >>> send(...) > > >>> > > >>> // when commitTx is called, there is still pending Futures and not > > all > > >> Callbacks are executed yet -- with the implicit flush, we know that > all > > >> Callbacks are executed, but even for this case, the user could only > > throw > > >> an exception inside the Callback to stop the TX to eventually commit > -- > > >> Futures cannot be used to make a decision to ignore error and commit > or > > not. > > >>> > > >>> // I consider this option not as safe > > >>> > > >>> commitTx(igrnoreErrors) > > >>> > > >>> > > >>> > > >>> 3a) required flush + commitTx(Option) > > >>> > > >>> Example: > > >>> > > >>> send(...) > > >>> send(...) > > >>> > > >>> flush() > > >>> > > >>> // at this point, we know that all Future are completed and all > > >> Callbacks are executed, and we can assume that all user code checking > > for > > >> error did execute, before `commitTx` is called > > >>> > > >>> // I consider this option as safe > > >>> > > >>> commitTx(ignoreErrors) > > >>> > > >>> > > >>> 3b) missing flush + commitTx(Option) > > >>> > > >>> Example: > > >>> > > >>> send(...) > > >>> send(...) > > >>> > > >>> // as flush() was not called explicitly, we should ignore > > >> `ignoreErrors` flag and always throw an exception if the producer is > in > > >> error state, because we cannot be sure that the user did all required > > check > > >> for error handling > > >>> > > >>> commitTx(ignoreErrors) > > >>> > > >>> > > >>> > > >>> The only issue with option (3) is, that it's more complex and > semantics > > >> are more subtle. But it might be the a good (necessary?) bridge > between > > (1) > > >> and (2): (3) is semantically sound (we ignore errors via passing a > flag > > >> into commitTx() instead of flush()), and at the same time safe (we > force > > >> users to explicitly flush() and [hopefully] do proper error handling, > > and > > >> don't rely in am implicit flush() during commitTx() which might be > error > > >> prone). > > >>> > > >>> (Also need to find a good and descriptive name for the flag we pass > > into > > >> `commitTx()` for this case.) > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> > > >>> On 6/24/24 8:51 AM, Andrew Schofield wrote: > > >>>> Hi Chris, > > >>>> That works for me too. I slightly prefer an option on flush(), but > > what > > >> you suggested > > >>>> works too. > > >>>> Thanks, > > >>>> Andrew > > >>>>> On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID> > > >> wrote: > > >>>>> > > >>>>> Hi Andrew, > > >>>>> > > >>>>> I like a lot of what you said, but I still believe it's better to > > >> override > > >>>>> commitTransaction than flush. Users will already have to manually > opt > > >> in to > > >>>>> ignoring errors encountered during transactions, and we can > document > > >>>>> recommended usage (i.e., explicitly invoking flush() before > invoking > > >>>>> commitTransaction(ignoreRecordErrors)) in the newly-introduced > > method. > > >> I > > >>>>> don't believe it's worth the increased cognitive load on users with > > >>>>> non-transactional producers to introduce an overloaded flush() > > variant. > > >>>>> > > >>>>> Cheers, > > >>>>> > > >>>>> Chris > > >>>>> > > >>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield < > > >> andrew_schofi...@live.com> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi Alieh, > > >>>>>> Thanks for driving this. Unfortunately, there are many parts of > the > > >> API > > >>>>>> which > > >>>>>> are a bit unfortunate and it’s tricky to make small improvements > > that > > >>>>>> don’t have > > >>>>>> downsides. > > >>>>>> > > >>>>>> I don’t like the idea of using a configuration because > configuration > > >> is > > >>>>>> often > > >>>>>> outside the application and changing the behaviour of someone > else’s > > >>>>>> application > > >>>>>> without understanding it is risky. Anything which embeds a > > >> transactional > > >>>>>> producer > > >>>>>> could have its behaviour changed unexpectedly. > > >>>>>> > > >>>>>> It would be been much nicer if send() didn’t fail silently and > > change > > >> the > > >>>>>> transaction > > >>>>>> state. But, because it’s an asynchronous operation, I don’t really > > >> think > > >>>>>> we can > > >>>>>> just make it throw all exceptions, even though I really think that > > >>>>>> `send()` is the > > >>>>>> method with the problem here. > > >>>>>> > > >>>>>> The contract of `flush()` is that it makes sure that all preceding > > >> sends > > >>>>>> will have > > >>>>>> completed, so it should be true that a well written application > > would > > >> be > > >>>>>> able to > > >>>>>> know which records were OK because of the Future<RecordMetadata> > > >> returned > > >>>>>> by the `send()` method. It should be able to determine whether it > > >> wants to > > >>>>>> commit > > >>>>>> the transaction even if some of the intended operations didn’t > > >> succeed. > > >>>>>> > > >>>>>> What we don’t currently have is a way for the application to say > to > > >> the > > >>>>>> KafkaProducer > > >>>>>> that it knows the outcome of sending the records and to confirm > that > > >> it > > >>>>>> wants to proceed. > > >>>>>> Then it would not be necessary for `commitTransaction()` to throw > an > > >>>>>> exception to > > >>>>>> report a historical error which the application might choose to > > >> ignore. > > >>>>>> > > >>>>>> Having read the comments, I think the KIP is on the right lines > > >> focusing > > >>>>>> on the `flush()` > > >>>>>> method. My suggestion is that we introduce an option on `flush()` > to > > >> be > > >>>>>> used before > > >>>>>> `commitTransaction()` for applications that want to be able to > > commit > > >>>>>> transactions which > > >>>>>> had known failed operations. > > >>>>>> > > >>>>>> The code would be: > > >>>>>> > > >>>>>> producer.beginTransaction(); > > >>>>>> > > >>>>>> future1 = producer.send(goodRecord1); > > >>>>>> future2 = producer.send(badRecord); // The future from this > call > > >> will > > >>>>>> complete exceptionally > > >>>>>> future3 = producer.send(goodRecord2); > > >>>>>> > > >>>>>> producer.flush(FlushOption.TRANSACTION_READY); > > >>>>>> > > >>>>>> // At this point, we know that all 3 futures are complete and > the > > >>>>>> transaction contains 2 records > > >>>>>> producer.commitTransaction(); > > >>>>>> > > >>>>>> I wouldn’t deprecate `flush()` with no option. It just uses the > > >> default > > >>>>>> option which behaves > > >>>>>> like today. > > >>>>>> > > >>>>>> Why did I suggest an option on `flush()` rather than > > >>>>>> `commitTransaction()`? Because with > > >>>>>> `flush()`, it’s clear when the application is stating that it’s > seen > > >> all > > >>>>>> of the results from its > > >>>>>> `send()` calls and it’s ready to proceed. If it has to rely on > > >> flushing > > >>>>>> that occurs inside > > >>>>>> `commitTransaction()`, I don’t see it’s as clear-cut. > > >>>>>> > > >>>>>> Thanks, > > >>>>>> Andrew > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi > > <asae...@confluent.io.INVALID > > >>> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>> Hi all, > > >>>>>>> Thanks for the interesting discussion. > > >>>>>>> > > >>>>>>> I assume that now the main questions are as follows: > > >>>>>>> > > >>>>>>> 1. Do we need to transit the transcation to the error state for > API > > >>>>>>> exceptions? > > >>>>>>> 2. Should we throw the API exception in `send()` instead of > > >> returning a > > >>>>>>> future error? > > >>>>>>> 3. If the answer to question (1) is NO and to question (2) is > YES, > > >> do we > > >>>>>>> need to change the current `flush` or `commitTnx` at all? > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> Alieh > > >>>>>>> > > >>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax < > mj...@apache.org> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hey Kirk, > > >>>>>>>> > > >>>>>>>> can you elaborate on a few points? > > >>>>>>>> > > >>>>>>>>> Otherwise users would have to know to explicitly change their > > code > > >> to > > >>>>>>>> invoke flush(). > > >>>>>>>> > > >>>>>>>> Why? If we would add an option to `flush(FlushOption)`, the > > existing > > >>>>>>>> `flush()` w/o any option will still be there, right? If we would > > >> really > > >>>>>>>> deprecate existing `flush()`, it would just mean that we would > > pass > > >>>>>>>> "default FlushOption" into an implicit flush (and yes, we would > > >> need to > > >>>>>>>> define what this would be). > > >>>>>>>> > > >>>>>>>> I think there is no clear winner (as pointed out in my last > > reply), > > >> and > > >>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has > > >> advantages > > >>>>>>>> and drawbacks. Guess we need to just agree on which tradeoff we > > >> want to > > >>>>>>>> move forward with? > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Not sure if your database example is a 1:1 fit? I think, the > > better > > >>>>>>>> comparison would be: > > >>>>>>>> > > >>>>>>>> BEGIN TX; > > >>>>>>>> INSERT INTO foo VALUES (’a’); > > >>>>>>>> INSERT INTO foo VALUES (’b’); > > >>>>>>>> INSERT INTO foo VALUES (’c’); > > >>>>>>>> INSERT INTO foo VALUES (’not sure’); > > >>>>>>>> > > >>>>>>>> For this case, the full TX would roll back, right? I still think > > >> that > > >>>>>>>> allowing users to just skip over the last error, and continue > the > > TX > > >>>>>>>> would be ok. In the end, we provide a programmatic API, and not > a > > >>>>>>>> declarative one as SQL. Of course, default behavior would still > be > > >> to > > >>>>>>>> put the producer into error state, and the user would need to > call > > >>>>>>>> `abortTransaction()` to move forward. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -Matthias > > >>>>>>>> > > >>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote: > > >>>>>>>>> Hi Matthias, > > >>>>>>>>> > > >>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax < > mj...@apache.org > > > > > >>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>> If we want to limit it to `RecordTooLargeException` throwing > > from > > >>>>>>>> `send()` directly make sense. Thanks for calling it out. > > >>>>>>>>>> > > >>>>>>>>>> It's still a question of backward compatibility? `send()` does > > >> throw > > >>>>>>>> exceptions already, including generic `KafkaException`. Not sure > > if > > >> this > > >>>>>>>> helps with backward compatibility? Could we just add a new > > exception > > >>>>>> type > > >>>>>>>> (which is a child of `KafkaException`)? > > >>>>>>>>>> > > >>>>>>>>>> The Producer JavaDocs are not totally explicit about it IMHO. > > >>>>>>>>>> > > >>>>>>>>>> I think we could expect that some generic error handling path > > gets > > >>>>>>>> executed. For the TX-case, I would assume that a TX would be > > >> aborted if > > >>>>>>>> `send()` throws or that the producer would be `closed()`. > Overall > > >> this > > >>>>>>>> might be safe? > > >>>>>>>>>> > > >>>>>>>>>>> It would be a little less flexible > > >>>>>>>>>>>> though, since (as you note) it would still be impossible to > > >> commit > > >>>>>>>>>>>> transactions after errors have been reported from brokers. > > >>>>>>>>>> > > >>>>>>>>>> KS would still need a way to clear the error state of the > > >> producer. We > > >>>>>>>> could catch a `RecordTooLargeException` from `send()`, call the > > >> handler > > >>>>>> and > > >>>>>>>> let it decide what to do next. But if it does return `CONTINUE` > to > > >>>>>> swallow > > >>>>>>>> the error and drop the poison pill record on the floor, we would > > >> want to > > >>>>>>>> move forward and commit the transaction. > > >>>>>>>>>> > > >>>>>>>>>> But the question is: if we cannot add a record to the tx, does > > the > > >>>>>>>> producer need to go into error state? In the end, we did throw > and > > >>>>>> inform > > >>>>>>>> the app that the record was _not_ added, and it's up to the app > to > > >>>>>> decide > > >>>>>>>> what to do next? > > >>>>>>>>> > > >>>>>>>>> That’s an excellent question… > > >>>>>>>>> > > >>>>>>>>> Imagine the user’s application is writing information to a > > database > > >>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column and > > this > > >> SQL > > >>>>>>>> statement was attempted, what should happen? > > >>>>>>>>> > > >>>>>>>>> INSERT INTO foo VALUES (’not sure’); > > >>>>>>>>> > > >>>>>>>>> Yes, that DML would fail, sure, but would the user expect that > > the > > >>>>>>>> connection used by database library would get stuck in some kind > > of > > >>>>>> error > > >>>>>>>> state? A user would be able catch the error and either continue > or > > >>>>>> abort, > > >>>>>>>> based on their business rules. > > >>>>>>>>> > > >>>>>>>>> So I agree with what I believe you’re implying: we shouldn’t > > >> poison the > > >>>>>>>> Producer/TransactionManager on certain types of > application-level > > >>>>>> errors in > > >>>>>>>> send(). > > >>>>>>>>> > > >>>>>>>>> Kirk > > >>>>>>>>> > > >>>>>>>>>> If we report the error only via the `Callback` it's a > different > > >> story, > > >>>>>>>> because the contract for this case is clearly specified on the > > >> JavaDocs: > > >>>>>>>>>> > > >>>>>>>>>>> When used as part of a transaction, it is not necessary to > > >> define a > > >>>>>>>> callback or check the result of the future > > >>>>>>>>>>> in order to detect errors from <code>send</code>. If any of > the > > >> send > > >>>>>>>> calls failed with an irrecoverable error, > > >>>>>>>>>>> the final {@link #commitTransaction()} call will fail and > throw > > >> the > > >>>>>>>> exception from the last failed send. When > > >>>>>>>>>>> this happens, your application should call {@link > > >>>>>> #abortTransaction()} > > >>>>>>>> to reset the state and continue to send > > >>>>>>>>>>> data. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -Matthias > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote: > > >>>>>>>>>>> Hi Artem, > > >>>>>>>>>>> I think it'd make sense to throw directly from send whenever > > >>>>>> possible, > > >>>>>>>>>>> instead of returning an already-completed future. I didn't do > > >> that in > > >>>>>>>> my > > >>>>>>>>>>> bug fix to try to be conservative about breaking changes but > > this > > >>>>>>>> seems to > > >>>>>>>>>>> have caused its own set of headaches. It would be a little > less > > >>>>>>>> flexible > > >>>>>>>>>>> though, since (as you note) it would still be impossible to > > >> commit > > >>>>>>>>>>> transactions after errors have been reported from brokers. > > >>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if that > > >>>>>>>> flexibility > > >>>>>>>>>>> is required. If it is, then users could explicitly call > flush() > > >>>>>> before > > >>>>>>>>>>> committing (and ignoring errors for) or aborting a > transaction, > > >> if > > >>>>>> they > > >>>>>>>>>>> want to implement fine-grained error handling logic such as > > >> allowing > > >>>>>>>> errors > > >>>>>>>>>>> for a subset of topics to be ignored. > > >>>>>>>>>>> Hi Matthias, > > >>>>>>>>>>> Most of the time you're right and we can't throw from send(); > > >>>>>> however, > > >>>>>>>> in > > >>>>>>>>>>> this case (client-side record-too-large exception), the error > > is > > >>>>>>>> actually > > >>>>>>>>>>> noticed by the producer before send() returns, so it should > be > > >>>>>>>> possible to > > >>>>>>>>>>> throw directly. > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> Chris > > >>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax < > mj...@apache.org> > > >>>>>> wrote: > > >>>>>>>>>>>> Not sure if we can change send and make it throw, given that > > >> send() > > >>>>>> is > > >>>>>>>>>>>> async? That is why users can register a `Callback` to begin > > >> with, > > >>>>>>>> right? > > >>>>>>>>>>>> > > >>>>>>>>>>>> And Alieh's point about backward compatibility is also a > fair > > >>>>>> concern. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Actually, this would potentially be even > > >>>>>>>>>>>>> worse than the original buggy behavior because the bug was > > >> that we > > >>>>>>>>>>>> ignored > > >>>>>>>>>>>>> errors that happened in the "send()" method itself, not > > >> necessarily > > >>>>>>>> the > > >>>>>>>>>>>>> ones that we got from the broker. > > >>>>>>>>>>>> > > >>>>>>>>>>>> My understanding was that `commitTx(swallowError)` would > only > > >>>>>> swallow > > >>>>>>>>>>>> `send()` errors, not errors about the actually commit. I > agree > > >> that > > >>>>>> it > > >>>>>>>>>>>> would be very bad to swallow errors about the actual tx > > >> commit... > > >>>>>>>>>>>> > > >>>>>>>>>>>> It's a fair question if this might be too subtle; to make it > > >>>>>> explicit, > > >>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` > [working > > >> name] > > >>>>>> to > > >>>>>>>>>>>> make it clear. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> If we think it's too subtle to change commit to swallow > send() > > >>>>>> errors, > > >>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer (and > > we > > >> can > > >>>>>>>> use > > >>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working > > name] > > >> to > > >>>>>> be > > >>>>>>>>>>>> explicitly that the `FlushOption` for now has only an effect > > for > > >>>>>> TX). > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thoughts? > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> -Matthias > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote: > > >>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> It is very exciting to see all the experts here raising > very > > >> good > > >>>>>>>> points. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> As we go further, we see more and more options to improve > our > > >>>>>>>> solution, > > >>>>>>>>>>>>> which makes concluding and updating the KIP impossible. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The main suggestions so far are: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 3. `send` must throw the exception > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> My concern about the 3rd suggestion: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 1. Does the change cause any issue with backward > > compatibility? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 2. The `send (bad record)` already transits the transaction > > to > > >> the > > >>>>>>>> error > > >>>>>>>>>>>>> state. No user, including Streams is able to transit the > > >>>>>> transaction > > >>>>>>>> back > > >>>>>>>>>>>>> from the error state. Do you mean we remove the > > >>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here > > >>>>>>>>>>>>> < > > >>>>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >> > > > https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112 > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> as well? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>> Alieh > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield < > > >>>>>>>>>>>> andrew_schofi...@live.com> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Artem, > > >>>>>>>>>>>>>> I think you make a good point which is worth further > > >>>>>> consideration. > > >>>>>>>> If > > >>>>>>>>>>>>>> any of the existing methods is really ripe for a change > > here, > > >> it’s > > >>>>>>>> the > > >>>>>>>>>>>>>> send() that actually caused the problem. If that can be > > fixed > > >> so > > >>>>>>>> there > > >>>>>>>>>>>> are > > >>>>>>>>>>>>>> no situations in which a lurking error breaks a > transaction, > > >> that > > >>>>>>>> might > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>> the best. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>> Andrew > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits < > > >> alivsh...@confluent.io > > >>>>>>>>>>>> .INVALID> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I thought we still wait for requests (and their errors) > to > > >> come > > >>>>>>>> in and > > >>>>>>>>>>>>>>> could handle fatal errors appropriately. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> We do wait for requests, but my understanding is that > when > > >>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to ignore > > >> errors. > > >>>>>>>> So > > >>>>>>>>>>>> if > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>> do > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1. send > > >>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors") > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> the commit will succeed. You can look at the example in > > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and > just > > >>>>>>>> substitute > > >>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send > > >> errors") > > >>>>>> and > > >>>>>>>> we > > >>>>>>>>>>>> get > > >>>>>>>>>>>>>>> the buggy behavior back :-). Actually, this would > > >> potentially be > > >>>>>>>> even > > >>>>>>>>>>>>>>> worse than the original buggy behavior because the bug > was > > >> that > > >>>>>> we > > >>>>>>>>>>>>>> ignored > > >>>>>>>>>>>>>>> errors that happened in the "send()" method itself, not > > >>>>>>>> necessarily the > > >>>>>>>>>>>>>>> ones that we got from the broker. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Actually, looking at > > >>>>>>>> https://github.com/apache/kafka/pull/11508/files, > > >>>>>>>>>>>>>>> wouldn't a better solution be to just throw the error > from > > >> the > > >>>>>>>> "send" > > >>>>>>>>>>>>>>> method itself, rather than trying to set it to be thrown > > >> during > > >>>>>>>> commit? > > >>>>>>>>>>>>>>> This way the example in > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 > > >>>>>>>>>>>>>>> would be fixed, and at the same time it would give an > > >> opportunity > > >>>>>>>> for > > >>>>>>>>>>>> KS > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>> catch the error and ignore it if needed. Not sure if we > > >> need a > > >>>>>>>> KIP for > > >>>>>>>>>>>>>>> that, just do a better fix of the old bug. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -Artem > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan > > >>>>>>>>>>>>>> <jols...@confluent.io.invalid> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here > looks > > >>>>>>>> reasonable. > > >>>>>>>>>>>>>>>> Moving the logic to a transactional method makes sense > to > > >> me and > > >>>>>>>> makes > > >>>>>>>>>>>>>> me > > >>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in the > > >> methods > > >>>>>>>> relevant > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> the issue. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send > > >>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option > without > > >>>>>> explicit > > >>>>>>>>>>>>>> flush, > > >>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the > application > > >> won't > > >>>>>> be > > >>>>>>>>>>>> able > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Is this with respect to the case a request is still > > inflight > > >>>>>> when > > >>>>>>>> we > > >>>>>>>>>>>>>> call > > >>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for requests > > (and > > >>>>>> their > > >>>>>>>>>>>>>> errors) > > >>>>>>>>>>>>>>>> to come in and could handle fatal errors appropriately. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Justine > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits > > >>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas), > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar > > could > > >> be a > > >>>>>>>> good > > >>>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>>> forward? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I like this approach. One minor concern is that if we > > set > > >>>>>>>> "ignore > > >>>>>>>>>>>> send > > >>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option > without > > >>>>>>>> explicit > > >>>>>>>>>>>>>> flush, > > >>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the > application > > >> won't > > >>>>>>>> be > > >>>>>>>>>>>> able > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors. > But > > I > > >>>>>> guess > > >>>>>>>>>>>> we'll > > >>>>>>>>>>>>>>>> just > > >>>>>>>>>>>>>>>>> have to clearly document it. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> In some way we are basically adding a flag to > optionally > > >>>>>> restore > > >>>>>>>> the > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, > > >> which is > > >>>>>>>> the > > >>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> -Artem > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax < > > >>>>>>>> mj...@apache.org> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a lot of > > >>>>>> support. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> So we need to go with some form or "overload / new > > >> method". I > > >>>>>>>> think > > >>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but > > rather > > >>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; for > > >> non-tx > > >>>>>>>> case, > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> different flush variants would not make sense. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options" > > >> object, so > > >>>>>>>> maybe > > >>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could > be a > > >> good > > >>>>>>>> way > > >>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean` parameter, > > >>>>>>>> aesthetically, > > >>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>> we as extendable in the future if necessary. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Given that we would pass in an optional parameter, we > > >> might > > >>>>>> not > > >>>>>>>> even > > >>>>>>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()` > > >> method? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote: > > >>>>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which changes > the > > >>>>>>>> behaviour > > >>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> flush() method. We already have too many configs. > But I > > >>>>>> totally > > >>>>>>>>>>>>>>>>>> understand > > >>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some of > the > > >> other > > >>>>>>>>>>>>>>>>> suggestions > > >>>>>>>>>>>>>>>>>>> in this thread seem neater. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Personally, I would add another method to > > KafkaProducer. > > >> Not > > >>>>>> an > > >>>>>>>>>>>>>>>>> overload > > >>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all. Using > > >>>>>> Matthias’s > > >>>>>>>>>>>>>>>>> options, > > >>>>>>>>>>>>>>>>>>> I prefer (3). > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>> Andrew > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. < > > liane...@gmail.com > > >>> > > >>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh! > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the > config > > >> not > > >>>>>>>> being > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> most > > >>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this capability. > > >> Getting > > >>>>>>>> then to > > >>>>>>>>>>>>>>>>>> Matthias > > >>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that > it > > >> seems > > >>>>>>>> they > > >>>>>>>>>>>>>>>> might > > >>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some > > other > > >>>>>> twist > > >>>>>>>> to > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> flush > > >>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another param > > to > > >> it, > > >>>>>> or > > >>>>>>>>>>>>>>>> another > > >>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the context to > > >> answer > > >>>>>>>> that, > > >>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>> if it > > >>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some kind > > >>>>>>>> FlushOptions > > >>>>>>>>>>>>>>>>>> params to > > >>>>>>>>>>>>>>>>>>>> the flush would be better from an extensibility > point > > of > > >>>>>>>> view. It > > >>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for now > but > > >> could > > >>>>>>>>>>>> accept > > >>>>>>>>>>>>>>>>> any > > >>>>>>>>>>>>>>>>>>>> other we may need. I find that this would remove the > > >>>>>>>> "ugliness" > > >>>>>>>>>>>>>>>>> Matthias > > >>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the > different > > >>>>>>>> semantics > > >>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>> flush, > > >>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and > > >>>>>>>>>>>> commitTransaction > > >>>>>>>>>>>>>>>>>> java > > >>>>>>>>>>>>>>>>>>>> docs. It currently states that flush "clears the > last > > >>>>>>>> exception" > > >>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after > > >> flush, > > >>>>>> but > > >>>>>>>> it > > >>>>>>>>>>>>>>>>> really > > >>>>>>>>>>>>>>>>>> all > > >>>>>>>>>>>>>>>>>>>> depends on the config/options/method used. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an > example > > to > > >>>>>> show > > >>>>>>>> the > > >>>>>>>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>> flow > > >>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great gain > > >> here): > > >>>>>>>> flush > > >>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>> clear > > >>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever error > > >> handling > > >>>>>>>> we > > >>>>>>>>>>>> want > > >>>>>>>>>>>>>>>>> -> > > >>>>>>>>>>>>>>>>>>>> commitTransaction successfully > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks! > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Lianet > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton < > > >>>>>>>>>>>>>>>>> fearthecel...@gmail.com > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Hi Matthias, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more > that > > >> might > > >>>>>>>> help > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> if, > > >>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded > > >>>>>>>> commitTransaction() > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean > > >>>>>>>> tolerateRecordErrors). > > >>>>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>>> seems > > >>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral > > >> change we > > >>>>>>>> want, > > >>>>>>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API > method > > >> that > > >>>>>> is > > >>>>>>>> only > > >>>>>>>>>>>>>>>>> used > > >>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid the > > issue > > >> of > > >>>>>>>> whether > > >>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered > > semantics) > > >>>>>>>> should > > >>>>>>>>>>>>>>>> throw > > >>>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>>> not. Thoughts? > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot > more > > >> than > > >>>>>> the > > >>>>>>>>>>>>>>>>> pluggable > > >>>>>>>>>>>>>>>>>>>>> handler! > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this > behavior > > >> via > > >>>>>>>>>>>>>>>>> configuration > > >>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that > > >> application > > >>>>>>>> code > > >>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>> written in a style that only works with one type of > > >>>>>> behavior > > >>>>>>>> from > > >>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that > > application > > >> code > > >>>>>>>> to > > >>>>>>>>>>>>>>>>> declare > > >>>>>>>>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer seems > > >> more > > >>>>>>>>>>>>>>>> appropriate > > >>>>>>>>>>>>>>>>>> than, > > >>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application to > > >> tweak a > > >>>>>>>>>>>>>>>>>> configuration > > >>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside it. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Chris > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax < > > >>>>>>>>>>>> mj...@apache.org > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP > > >> as-is, > > >>>>>> but > > >>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points... > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move forward? > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 1. add config to allow "silent error clearance" > as > > >> the > > >>>>>>>> KIP > > >>>>>>>>>>>>>>>>> proposes > > >>>>>>>>>>>>>>>>>>>>>> 2. change flush() to clear error and let it > throw > > >>>>>>>>>>>>>>>>>>>>>> 3. add new flushAndThrow()` (or better name) > which > > >>>>>> clears > > >>>>>>>>>>>> error > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> throws > > >>>>>>>>>>>>>>>>>>>>>> 4. add `flush(boolean clearAndThrow)` and let > user > > >> pick > > >>>>>>>> (and > > >>>>>>>>>>>>>>>>>> deprecate > > >>>>>>>>>>>>>>>>>>>>>> existing `flush()`) > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior change, > > we > > >>>>>> might > > >>>>>>>> also > > >>>>>>>>>>>>>>>>> need > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>> public "feature flag" config. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem > > >> mentioned. > > >>>>>>>> (3) > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> (4) > > >>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we > > >> kinda get > > >>>>>>>> an > > >>>>>>>>>>>> ugly > > >>>>>>>>>>>>>>>>>> API? > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems > > we > > >> need > > >>>>>>>> to > > >>>>>>>>>>>> pick > > >>>>>>>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution? > Would > > >> be > > >>>>>>>> good to > > >>>>>>>>>>>>>>>> her > > >>>>>>>>>>>>>>>>>> from > > >>>>>>>>>>>>>>>>>>>>>> others what they think > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote: > > >>>>>>>>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP. I have a couple of > > >> suggestions: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> AL1. We should throw an error from flush after > we > > >> clear > > >>>>>>>> it. > > >>>>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send + > > >> flush + > > >>>>>>>>>>>> commit" > > >>>>>>>>>>>>>>>>> (the > > >>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to > > express > > >> the > > >>>>>>>>>>>> former, > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would > > >> throw if > > >>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> transaction > > >>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written either > way > > >> it's > > >>>>>>>> going > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>> correct). > > >>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended by > > the > > >>>>>>>> caller to > > >>>>>>>>>>>>>>>>>>>>> intercept > > >>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and > commit > > >> the > > >>>>>>>>>>>>>>>>> transaction. > > >>>>>>>>>>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if > someone > > >> has > > >>>>>>>> code > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>> doesn't > > >>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic > "send + > > >>>>>> flush + > > >>>>>>>>>>>>>>>> commit" > > >>>>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things possible, > > an > > >>>>>>>>>>>> application > > >>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>> add > > >>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> AL2. I'm not sure if config is the best way to > > >> express > > >>>>>> the > > >>>>>>>>>>>>>>>>>>>>> modification > > >>>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic > that > > >> calls > > >>>>>>>>>>>> "flush" > > >>>>>>>>>>>>>>>>>> needs > > >>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring > > >> semantics in > > >>>>>> a > > >>>>>>>>>>>>>>>> detached > > >>>>>>>>>>>>>>>>>>>>> place > > >>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies. > This > > >> can > > >>>>>> be > > >>>>>>>>>>>>>>>>> especially > > >>>>>>>>>>>>>>>>>>>>> bad > > >>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file > at > > >> run > > >>>>>>>> time, in > > >>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>> case a > > >>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the > > application > > >>>>>>>> because it > > >>>>>>>>>>>>>>>> was > > >>>>>>>>>>>>>>>>>>>>>> written > > >>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics > > is > > >>>>>>>> switched. > > >>>>>>>>>>>>>>>>> Given > > >>>>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's > > >>>>>>>> expectation, > > >>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's > > >> expectation > > >>>>>>>> to > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> "flush" > > >>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a different > name > > or > > >>>>>> have > > >>>>>>>> an > > >>>>>>>>>>>>>>>>>> overload > > >>>>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics > > (the > > >>>>>>>> current > > >>>>>>>>>>>>>>>>> method > > >>>>>>>>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one). > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> -Artem > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi > > >>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 > > that > > >>>>>>>> suggests > > >>>>>>>>>>>>>>>>> adding > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>>>>>>> Alieh > > >> > > >> > > >> > > > > > >