Or are you going to maintain some internal state such that, the `abortTransaction` in the catch block would never throw again?
On Fri, Jan 22, 2021 at 11:01 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Boyang/Jason, > > I've also thought about this (i.e. using CommitFailed for all non-fatal), > but what I'm pondering is that, in the catch (CommitFailed) block, what > would happen if the `producer.abortTransaction();` throws again? should > that be captured as a fatal and cause the client to close again. > > If yes, then naively the pattern would be: > > ... > catch (CommitFailedException e) { > // Transaction commit failed with abortable error, user could reset > // the application state and resume with a new transaction. The > root > // cause was wrapped in the thrown exception. > resetToLastCommittedPositions(consumer); > try { > producer.abortTransaction(); > } catch (KafkaException e) { > producer.close(); > consumer.close(); > throw e; > } > } catch (KafkaException e) { > producer.close(); > consumer.close(); > throw e; > } > ... > > Guozhang > > On Fri, Jan 22, 2021 at 10:47 AM Boyang Chen <reluctanthero...@gmail.com> > wrote: > >> Hey Guozhang, >> >> Jason and I were discussing the new API offline and decided to take >> another >> approach. Firstly, the reason not to invent a new API with returned >> boolean >> flag is for compatibility consideration, since old EOS code would not know >> that a given transaction commit was failed internally as they don't listen >> to the output flag. Our proposed alternative solution is to let >> *commitTransaction >> throw CommitFailedException whenever the commit failed with non-fatal >> exception*, so that on the caller side the handling logic becomes: >> >> try { >> if (shouldCommit) { >> producer.commitTransaction(); >> } else { >> resetToLastCommittedPositions(consumer); >> producer.abortTransaction(); >> } >> } catch (CommitFailedException e) { >> // Transaction commit failed with abortable error, user could >> reset >> // the application state and resume with a new transaction. The >> root >> // cause was wrapped in the thrown exception. >> resetToLastCommittedPositions(consumer); >> producer.abortTransaction(); >> } catch (KafkaException e) { >> producer.close(); >> consumer.close(); >> throw e; >> } >> >> This approach looks cleaner as all exception types other than CommitFailed >> will doom to be fatal, which is very easy to adopt for users. In the >> meantime, we still maintain the commitTxn behavior to throw instead of >> silently failing. >> >> In addition, we decided to drop the recommendation to handle >> TimeoutException and leave it to the users to make the call. The downside >> for blindly calling abortTxn upon timeout is that we could result in an >> illegal state when the commit was already successful on the broker >> side. Without a good guarantee on the outcome, overcomplicating the >> template should not be encouraged IMHO. >> >> Let me know your thoughts on the new approach here, thank you! >> >> Best, >> Boyang >> >> On Tue, Jan 19, 2021 at 11:11 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Thanks for your clarification on 2)/3), that makes sense. >> > >> > On Tue, Jan 19, 2021 at 10:16 AM Boyang Chen < >> reluctanthero...@gmail.com> >> > wrote: >> > >> > > Thanks for the input Guozhang, replied inline. >> > > >> > > On Mon, Jan 18, 2021 at 8:57 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > > >> > > > Hello Boyang, >> > > > >> > > > Thanks for the updated KIP. I read it again and have the following >> > > > thoughts: >> > > > >> > > > 0. I'm a bit concerned that if commitTxn does not throw any >> non-fatal >> > > > exception, and instead we rely on the subsequent beginTxn call to >> > throw, >> > > it >> > > > may violate some callers with a pattern that relying on commitTxn to >> > > > succeed to make some non-rollback operations. For example: >> > > > >> > > > beginTxn() >> > > > // do some read-write on my local DB >> > > > commitTxn() >> > > > // if commitTxn succeeds, then commit the DB >> > > > >> > > > ------------- >> > > > >> > > > The issue is that, committing DB is a non-rollback operation, and >> users >> > > may >> > > > just rely on commitTxn to return without error to make this >> > non-rollback >> > > > call. Of course we can just claim this pattern is not legitimate >> and is >> > > not >> > > > the right way of doing things, but many users may naturally adopt >> this >> > > > pattern. >> > > > >> > > > So maybe we should still let commitTxn also throw non-fatal >> exceptions, >> > > in >> > > > which case we would then call abortTxn again. >> > > > >> > > > But if we do this, the pattern becomes: >> > > > >> > > > try { >> > > > beginTxn() >> > > > // do something >> > > > } catch (Exception) { >> > > > shouldCommit = false; >> > > > } >> > > > >> > > > if (shouldCommit) { >> > > > try { >> > > > commitTxn() >> > > > } catch (...) { // enumerate all fatal exceptions >> > > > shutdown() >> > > > } catch (KafkaException) { >> > > > // non-fatal >> > > > shouldCommit = false; >> > > > } >> > > > } >> > > > >> > > > if (!shouldCommit && running()) { >> > > > try { >> > > > abortTxn() >> > > > } catch (KafkaException) { >> > > > // only throw fatal >> > > > shutdown() >> > > > } >> > > > } >> > > > >> > > > --------------------- >> > > > >> > > > Which is much more complicated. >> > > > >> > > > Thank makes me think, the alternative we have discussed offline may >> be >> > > > better: let commitTxn() to return a boolean flag. >> > > > >> > > > * If it returns true, it means the commit succeeded. Users can >> > > comfortably >> > > > continue and do any external non-rollback operations if they like. >> > > > * If it returns false, it means the commit failed with non-fatal >> error >> > > *AND >> > > > the txn has been aborted*. Users do not need to call abortTxn again. >> > > > * If it throws, then it means fatal errors. Users should shut down >> the >> > > > client. >> > > > >> > > > In this case, the pattern becomes: >> > > > >> > > > try { >> > > > beginTxn() >> > > > // do something >> > > > } catch (Exception) { >> > > > shouldCommit = false; >> > > > } >> > > > >> > > > try { >> > > > if (shouldCommit) { >> > > > commitSucceeded = commitTxn() >> > > > } else { >> > > > // reset offsets, rollback operations, etc >> > > > abortTxn() >> > > > } >> > > > } catch (KafkaException) { >> > > > // only throw fatal >> > > > shutdown() >> > > > } >> > > > >> > > > if (commitSucceeded) >> > > > // do other non-rollbackable things >> > > > else >> > > > // reset offsets, rollback operations, etc >> > > > >> > > > ------------------------- >> > > > >> > > > Of course, if we want to have better visibility into what caused the >> > > commit >> > > > to fail and txn to abort. We can let the return type be an enum >> instead >> > > of >> > > > a flag. But my main idea is to still let the commitTxn be the final >> > point >> > > > users can tell whether this txn succeeded or not, instead of >> relying on >> > > the >> > > > next beginTxn() call. >> > > > >> > > > I agree that adding a boolean flag is indeed useful in this case. >> Will >> > > update the KIP. >> > > >> > > 1. Re: "while maintaining the behavior to throw fatal exception in >> raw" >> > not >> > > > sure what do you mean by "throw" here. Are you proposing the >> callback >> > > would >> > > > *pass* (not throw) in any fatal exceptions when triggered without >> > > wrapping? >> > > > >> > > > Yes, I want to say *pass*, the benefit is to make the end user's >> > > expectation consistent >> > > regarding exception handling. >> > > >> > > >> > > > 2. I'm not sure I understand the claim regarding the callback that >> "In >> > > EOS >> > > > setup, it is not required to handle the exception". Are you >> proposing >> > > that, >> > > > e.g. in Streams, we do not try to handle any exceptions if EOS is >> > enabled >> > > > in the callback anymore, but just let commitTxn() itself to fail to >> be >> > > > notified about the problem? Personally I think in Streams we should >> > just >> > > > make the handling logic of the callback to be consistent regardless >> of >> > > the >> > > > EOS settings (today we have different logic depending on this logic, >> > > which >> > > > I think could be unified as well). >> > > > >> > > > My idea originates from the claim on send API: >> > > "When used as part of a transaction, it is not necessary to define a >> > > callback or check the result of the future in order to detect errors >> > from >> > > <code>send</code>. " >> > > My understanding is that for EOS, the exception will be detected by >> > calling >> > > transactional APIs either way, so it is a duplicate handling to track >> > > all the sendExceptions in RecordCollector. However, I looked up >> > > sendException is being used today as follow: >> > > >> > > 1. Pass to "ProductionExceptionHandler" for any default or customized >> > > exception handler to handle >> > > 2. Stop collecting offset info or new exceptions >> > > 3. Check and rethrow exceptions in close(), flush() or new send() >> calls >> > > >> > > To my understanding, we should still honor the commitment to #1 for >> any >> > > user customized implementation. The #2 does not really affect our >> > decision >> > > upon EOS. The #3 here is still valuable as it could help us fail fast >> in >> > > new send() instead of waiting to later stage of processing. In that >> > sense, >> > > I agree we should continue to record send exceptions even under EOS >> case >> > to >> > > ensure the strength of stream side Producer logic. On the safer side, >> we >> > no >> > > longer need to wrap certain fatal exceptions like ProducerFenced as >> > > TaskMigrated, since they should not crash the stream thread if thrown >> in >> > > raw format, once we adopt the new processing model in the send phase. >> > > >> > > >> > > > >> > > > Guozhang >> > > > >> > > > >> > > > >> > > > On Thu, Dec 17, 2020 at 8:42 PM Boyang Chen < >> > reluctanthero...@gmail.com> >> > > > wrote: >> > > > >> > > > > Thanks for everyone's feedback so far. I have polished the KIP >> after >> > > > > offline discussion with some folks working on EOS to make the >> > exception >> > > > > handling more lightweight. The essential change is that we are not >> > > > > inventing a new intermediate exception type, but instead >> separating a >> > > > full >> > > > > transaction into two phases: >> > > > > >> > > > > 1. The data transmission phase >> > > > > 2. The commit phase >> > > > > >> > > > > This way for any exception thrown from phase 1, will be an >> indicator >> > in >> > > > > phase 2 whether we should do commit or abort, and from now on >> > > > > `commitTransaction` should only throw fatal exceptions, similar to >> > > > > `abortTransaction`, so that any KafkaException caught in the >> commit >> > > phase >> > > > > will be definitely fatal to crash the app. For more advanced users >> > such >> > > > as >> > > > > Streams, we have the ability to further wrap selected types of >> fatal >> > > > > exceptions to trigger task migration if necessary. >> > > > > >> > > > > More details in the KIP, feel free to take another look, thanks! >> > > > > >> > > > > On Thu, Dec 17, 2020 at 8:36 PM Boyang Chen < >> > > reluctanthero...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > Thanks Bruno for the feedback. >> > > > > > >> > > > > > On Mon, Dec 7, 2020 at 5:26 AM Bruno Cadonna < >> br...@confluent.io> >> > > > wrote: >> > > > > > >> > > > > >> Thanks Boyang for the KIP! >> > > > > >> >> > > > > >> Like Matthias, I do also not know the producer internal well >> > enough >> > > to >> > > > > >> comment on the categorization. However, I think having a super >> > > > exception >> > > > > >> (e.g. RetriableException) that encodes if an exception is >> fatal >> > or >> > > > not >> > > > > >> is cleaner, better understandable and less error-prone, because >> > > > ideally >> > > > > >> when you add a new non-fatal exception in future you just need >> to >> > > > think >> > > > > >> about letting it inherit from the super exception and all the >> rest >> > > of >> > > > > >> the code will just behave correctly without the need to wrap >> the >> > new >> > > > > >> exception into another exception each time it is thrown (maybe >> it >> > is >> > > > > >> thrown at different location in the code). >> > > > > >> >> > > > > >> As far as I understand the following statement from your >> previous >> > > > e-mail >> > > > > >> is the reason that currently such a super exception is not >> > possible: >> > > > > >> >> > > > > >> "Right now we have RetriableException type, if we are going to >> > add a >> > > > > >> `ProducerRetriableException` type, we have to put this new >> > interface >> > > > as >> > > > > >> the parent of the RetriableException, because not all thrown >> > > non-fatal >> > > > > >> exceptions are `retriable` in general for producer" >> > > > > >> >> > > > > >> >> > > > > >> In the list of exceptions in your KIP, I found non-fatal >> > exceptions >> > > > that >> > > > > >> do not inherit from RetriableException. I guess those are the >> ones >> > > you >> > > > > >> are referring to in your statement: >> > > > > >> >> > > > > >> InvalidProducerEpochException >> > > > > >> InvalidPidMappingException >> > > > > >> TransactionAbortedException >> > > > > >> >> > > > > >> All of those exceptions are non-fatal and do not inherit from >> > > > > >> RetriableException. Is there a reason for that? If they >> depended >> > > from >> > > > > >> RetriableException we would be a bit closer to a super >> exception I >> > > > > >> mention above. >> > > > > >> >> > > > > >> The reason is that sender may catch those exceptions in the >> > > > > > ProduceResponse, and it currently does infinite >> > > > > > retries on RetriableException. To make sure we could trigger the >> > > > > > abortTransaction() by catching non-fatal thrown >> > > > > > exceptions and reinitialize the txn state, we chose not to let >> > those >> > > > > > exceptions inherit RetriableException so that >> > > > > > they won't cause infinite retry on sender. >> > > > > > >> > > > > > >> > > > > >> With OutOfOrderSequenceException and >> UnknownProducerIdException, I >> > > > think >> > > > > >> to understand that their fatality depends on the type (i.e. >> > > > > >> configuration) of the producer. That makes it difficult to >> have a >> > > > super >> > > > > >> exception that encodes the retriability as mentioned above. >> Would >> > it >> > > > be >> > > > > >> possible to introduce exceptions that inherit from >> > > RetriableException >> > > > > >> and that are thrown when those exceptions are caught from the >> > > brokers >> > > > > >> and the type of the producer is such that the exceptions are >> > > > retriable? >> > > > > >> >> > > > > >> Yea, I think in general the exception type mixing is difficult >> to >> > > get >> > > > > > them all right. I have already proposed another solution based >> on >> > my >> > > > > > offline discussion with some folks working on EOS >> > > > > > to make the handling more straightforward for end users without >> the >> > > > need >> > > > > > to distinguish exception fatality. >> > > > > > >> > > > > >> Best, >> > > > > >> Bruno >> > > > > >> >> > > > > >> >> > > > > >> On 04.12.20 19:34, Guozhang Wang wrote: >> > > > > >> > Thanks Boyang for the proposal! I made a pass over the list >> and >> > > here >> > > > > are >> > > > > >> > some thoughts: >> > > > > >> > >> > > > > >> > 0) Although this is not part of the public API, I think we >> > should >> > > > make >> > > > > >> sure >> > > > > >> > that the suggested pattern (i.e. user can always call >> abortTxn() >> > > > when >> > > > > >> > handling non-fatal errors) are indeed supported. E.g. if the >> txn >> > > is >> > > > > >> already >> > > > > >> > aborted by the broker side, then users can still call >> abortTxn >> > > which >> > > > > >> would >> > > > > >> > not throw another exception but just be treated as a no-op. >> > > > > >> > >> > > > > >> > 1) *ConcurrentTransactionsException*: I think this error can >> > also >> > > be >> > > > > >> > returned but not documented yet. This should be a non-fatal >> > error. >> > > > > >> > >> > > > > >> > 2) *InvalidTxnStateException*: this error is returned from >> > broker >> > > > when >> > > > > >> txn >> > > > > >> > state transition failed (e.g. it is trying to transit to >> > > > > complete-commit >> > > > > >> > while the current state is not prepare-commit). This error >> could >> > > > > >> indicates >> > > > > >> > a bug on the client internal code or the broker code, OR a >> user >> > > > error >> > > > > >> --- a >> > > > > >> > similar error is ConcurrentTransactionsException, i.e. if >> Kafka >> > is >> > > > > >> bug-free >> > > > > >> > these exceptions would only be returned if users try to do >> > > something >> > > > > >> wrong, >> > > > > >> > e.g. calling abortTxn right after a commitTxn, etc. So I'm >> > > thinking >> > > > it >> > > > > >> > should be a non-fatal error instead of a fatal error, wdyt? >> > > > > >> > >> > > > > >> > 3) *KafkaException*: case i "indicates fatal transactional >> > > sequence >> > > > > >> > (Fatal)", this is a bit conflicting with the >> > > > *OutOfSequenceException* >> > > > > >> that >> > > > > >> > is treated as non-fatal. I guess your proposal is that >> > > > > >> > OutOfOrderSequenceException would be treated either as fatal >> > with >> > > > > >> > transactional producer, or non-fatal with idempotent >> producer, >> > is >> > > > that >> > > > > >> > right? If the producer is only configured with idempotency >> but >> > not >> > > > > >> > transaction, then throwing a >> TransactionStateCorruptedException >> > > for >> > > > > >> > non-fatal errors would be confusing since users are not using >> > > > > >> transactions >> > > > > >> > at all.. So I suggest we always throw OutOfSequenceException >> > as-is >> > > > > (i.e. >> > > > > >> > not wrapped) no matter how the producer is configured, and >> let >> > the >> > > > > >> caller >> > > > > >> > decide how to handle it based on whether it is only >> idempotent >> > or >> > > > > >> > transactional itself. >> > > > > >> > >> > > > > >> > 4) Besides all the txn APIs, the `send()` callback / future >> can >> > > also >> > > > > >> throw >> > > > > >> > txn-related exceptions, I think this KIP should also cover >> this >> > > API >> > > > as >> > > > > >> well? >> > > > > >> > >> > > > > >> > 5) This is related to 1/2) above: sometimes those non-fatal >> > errors >> > > > > like >> > > > > >> > ConcurrentTxn or InvalidTxnState are not due to the state >> being >> > > > > >> corrupted >> > > > > >> > at the broker side, but maybe users are doing something >> wrong. >> > So >> > > > I'm >> > > > > >> > wondering if we should further distinguish those non-fatal >> > errors >> > > > > >> between >> > > > > >> > a) those that are caused by Kafka itself, e.g. a broker timed >> > out >> > > > and >> > > > > >> > aborted a txn and later an endTxn request is received, and b) >> > the >> > > > > user's >> > > > > >> > API call pattern is incorrect, causing the request to be >> > rejected >> > > > with >> > > > > >> an >> > > > > >> > error code from the broker. >> *TransactionStateCorruptedException* >> > > > feels >> > > > > >> to >> > > > > >> > me more like for case a), but not case b). >> > > > > >> > >> > > > > >> > >> > > > > >> > Guozhang >> > > > > >> > >> > > > > >> > >> > > > > >> > On Wed, Dec 2, 2020 at 4:50 PM Boyang Chen < >> > > > > reluctanthero...@gmail.com> >> > > > > >> > wrote: >> > > > > >> > >> > > > > >> >> Thanks Matthias, I think your proposal makes sense as well, >> on >> > > the >> > > > > pro >> > > > > >> side >> > > > > >> >> we could have a universally agreed exception type to be >> caught >> > by >> > > > the >> > > > > >> user, >> > > > > >> >> without having an extra layer on top of the actual >> exceptions. >> > I >> > > > > could >> > > > > >> see >> > > > > >> >> some issue on downsides: >> > > > > >> >> >> > > > > >> >> 1. The exception hierarchy will be more complex. Right now >> we >> > > have >> > > > > >> >> RetriableException type, if we are going to add a >> > > > > >> >> `ProducerRetriableException` type, we have to put this new >> > > > interface >> > > > > >> as the >> > > > > >> >> parent of the RetriableException, because not all thrown >> > > non-fatal >> > > > > >> >> exceptions are `retriable` in general for producer, for >> example >> > > > > >> >> < >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/e275742f850af4a1b79b0d1bd1ac9a1d2e89c64e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L745 >> > > > > >> >>> . >> > > > > >> >> We could have an empty interface >> `ProducerRetriableException` >> > to >> > > > let >> > > > > >> all >> > > > > >> >> the thrown exceptions implement for sure, but it's a bit >> > > unorthodox >> > > > > in >> > > > > >> the >> > > > > >> >> way we deal with exceptions in general. >> > > > > >> >> >> > > > > >> >> 2. There are cases where we throw a KafkaException wrapping >> > > another >> > > > > >> >> KafkaException as either fatal or non-fatal. If we use an >> > > interface >> > > > > to >> > > > > >> >> solve #1, it is also required to implement another bloated >> > > > exception >> > > > > >> class >> > > > > >> >> which could replace KafkaException type, as we couldn't mark >> > > > > >> KafkaException >> > > > > >> >> as retriable for sure. >> > > > > >> >> >> > > > > >> >> 3. In terms of the encapsulation, wrapping means we could >> limit >> > > the >> > > > > >> scope >> > > > > >> >> of affection to the producer only, which is important since >> we >> > > > don't >> > > > > >> want >> > > > > >> >> shared exception types to implement a producer-related >> > interface, >> > > > > such >> > > > > >> >> as UnknownTopicOrPartitionException. >> > > > > >> >> >> > > > > >> >> Best, >> > > > > >> >> Boyang >> > > > > >> >> >> > > > > >> >> On Wed, Dec 2, 2020 at 3:38 PM Matthias J. Sax < >> > mj...@apache.org >> > > > >> > > > > >> wrote: >> > > > > >> >> >> > > > > >> >>> Thanks for the KIP Boyang! >> > > > > >> >>> >> > > > > >> >>> Overall, categorizing exceptions makes a lot of sense. As I >> > > don't >> > > > > know >> > > > > >> >>> the producer internals well enough, I cannot comment on the >> > > > > >> >>> categorization in detail though. >> > > > > >> >>> >> > > > > >> >>> What I am wondering is, if we should introduce an exception >> > > > > interface >> > > > > >> >>> that non-fatal exception would implement instead of >> creating a >> > > new >> > > > > >> class >> > > > > >> >>> that will wrap non-fatal exceptions? What would be the >> > pros/cons >> > > > for >> > > > > >> >>> both designs? >> > > > > >> >>> >> > > > > >> >>> >> > > > > >> >>> -Matthias >> > > > > >> >>> >> > > > > >> >>> >> > > > > >> >>> On 12/2/20 11:35 AM, Boyang Chen wrote: >> > > > > >> >>>> Hey there, >> > > > > >> >>>> >> > > > > >> >>>> I would like to start a discussion thread for KIP-691: >> > > > > >> >>>> >> > > > > >> >>> >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling >> > > > > >> >>>> >> > > > > >> >>>> The KIP is aiming to simplify the exception handling logic >> > for >> > > > > >> >>>> transactional Producer users by classifying fatal and >> > non-fatal >> > > > > >> >>> exceptions >> > > > > >> >>>> and throw them correspondingly for easier catch and retry. >> > Let >> > > me >> > > > > >> know >> > > > > >> >>> what >> > > > > >> >>>> you think. >> > > > > >> >>>> >> > > > > >> >>>> Best, >> > > > > >> >>>> Boyang >> > > > > >> >>>> >> > > > > >> >>> >> > > > > >> >> >> > > > > >> > >> > > > > >> > >> > > > > >> >> > > > > > >> > > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > -- > -- Guozhang > -- -- Guozhang