Hi Luke & Artem,

We prepared the fix, would you please help in getting a committer-reviewer
to get this issue resolved?

Thanks,
Viktor

On Fri, Jul 8, 2022 at 12:57 PM Dániel Urbán <urb.dani...@gmail.com> wrote:

> Submitted a PR with the fix: https://github.com/apache/kafka/pull/12392
> In the PR I tried keeping the producer in a usable state after the forced
> bump. I understand that it might be the cleanest solution, but the only
> other option I know of is to transition into a fatal state, meaning that
> the producer has to be recreated after a delivery timeout. I think that is
> still fine compared to the out-of-order messages.
>
> Looking forward to your reviews,
> Daniel
>
> Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2022. júl. 7., Cs,
> 12:04):
>
> > Thanks for the feedback, I created
> > https://issues.apache.org/jira/browse/KAFKA-14053 and started working on
> > a PR.
> >
> > Luke, for the workaround, we used the transaction admin tool released in
> > 3.0 to "abort" these hanging batches manually.
> > Naturally, the cluster health should be stabilized. This issue popped up
> > most frequently around times when some partitions went into a few minute
> > window of unavailability. The infinite retries on the producer side
> caused
> > a situation where the last retry was still in-flight, but the delivery
> > timeout was triggered on the client side. We reduced the retries and
> > increased the delivery timeout to avoid such situations.
> > Still, the issue can occur in other scenarios, for example a client
> > queueing up many batches in the producer buffer, and causing those
> batches
> > to spend most of the delivery timeout window in the client memory.
> >
> > Thanks,
> > Daniel
> >
> > Luke Chen <show...@gmail.com> ezt írta (időpont: 2022. júl. 7., Cs,
> 5:13):
> >
> >> Hi Daniel,
> >>
> >> Thanks for reporting the issue, and the investigation.
> >> I'm curious, so, what's your workaround for this issue?
> >>
> >> I agree with Artem, it makes sense. Please file a bug in JIRA.
> >> And looking forward to your PR! :)
> >>
> >> Thank you.
> >> Luke
> >>
> >> On Thu, Jul 7, 2022 at 3:07 AM Artem Livshits
> >> <alivsh...@confluent.io.invalid> wrote:
> >>
> >> > Hi Daniel,
> >> >
> >> > What you say makes sense.  Could you file a bug and put this info
> there
> >> so
> >> > that it's easier to track?
> >> >
> >> > -Artem
> >> >
> >> > On Wed, Jul 6, 2022 at 8:34 AM Dániel Urbán <urb.dani...@gmail.com>
> >> wrote:
> >> >
> >> > > Hello everyone,
> >> > >
> >> > > I've been investigating some transaction related issues in a very
> >> > > problematic cluster. Besides finding some interesting issues, I had
> >> some
> >> > > ideas about how transactional producer behavior could be improved.
> >> > >
> >> > > My suggestion in short is: when the transactional producer
> encounters
> >> an
> >> > > error which doesn't necessarily mean that the in-flight request was
> >> > > processed (for example a client side timeout), the producer should
> not
> >> > send
> >> > > an EndTxnRequest on abort, but instead it should bump the producer
> >> epoch.
> >> > >
> >> > > The long description about the issue I found, and how I came to the
> >> > > suggestion:
> >> > >
> >> > > First, the description of the issue. When I say that the cluster is
> >> "very
> >> > > problematic", I mean all kinds of different issues, be it infra
> (disks
> >> > and
> >> > > network) or throughput (high volume producers without fine tuning).
> >> > > In this cluster, Kafka transactions are widely used by many
> producers.
> >> > And
> >> > > in this cluster, partitions get "stuck" frequently (few times every
> >> > week).
> >> > >
> >> > > The exact meaning of a partition being "stuck" is this:
> >> > >
> >> > > On the client side:
> >> > > 1. A transactional producer sends X batches to a partition in a
> single
> >> > > transaction
> >> > > 2. Out of the X batches, the last few get sent, but are timed out
> >> thanks
> >> > to
> >> > > the delivery timeout config
> >> > > 3. producer.flush() is unblocked due to all batches being "finished"
> >> > > 4. Based on the errors reported in the producer.send() callback,
> >> > > producer.abortTransaction() is called
> >> > > 5. Then producer.close() is also invoked with a 5s timeout (this
> >> > > application does not reuse the producer instances optimally)
> >> > > 6. The transactional.id of the producer is never reused (it was
> >> random
> >> > > generated)
> >> > >
> >> > > On the partition leader side (what appears in the log segment of the
> >> > > partition):
> >> > > 1. The batches sent by the producer are all appended to the log
> >> > > 2. But the ABORT marker of the transaction was appended before the
> >> last 1
> >> > > or 2 batches of the transaction
> >> > >
> >> > > On the transaction coordinator side (what appears in the transaction
> >> > state
> >> > > partition):
> >> > > The transactional.id is present with the Empty state.
> >> > >
> >> > > These happenings result in the following:
> >> > > 1. The partition leader handles the first batch after the ABORT
> >> marker as
> >> > > the first message of a new transaction of the same producer id +
> >> epoch.
> >> > > (LSO is blocked at this point)
> >> > > 2. The transaction coordinator is not aware of any in-progress
> >> > transaction
> >> > > of the producer, thus never aborting the transaction, not even after
> >> the
> >> > > transaction.timeout.ms passes.
> >> > >
> >> > > This is happening with Kafka 2.5 running in the cluster, producer
> >> > versions
> >> > > range between 2.0 and 2.6.
> >> > > I scanned through a lot of tickets, and I believe that this issue is
> >> not
> >> > > specific to these versions, and could happen with newest versions as
> >> > well.
> >> > > If I'm mistaken, some pointers would be appreciated.
> >> > >
> >> > > Assuming that the issue could occur with any version, I believe this
> >> > issue
> >> > > boils down to one oversight on the client side:
> >> > > When a request fails without a definitive response (e.g. a delivery
> >> > > timeout), the client cannot assume that the request is "finished",
> and
> >> > > simply abort the transaction. If the request is still in flight, and
> >> the
> >> > > EndTxnRequest, then the WriteTxnMarkerRequest gets sent and
> processed
> >> > > earlier, the contract is violated by the client.
> >> > > This could be avoided by providing more information to the partition
> >> > > leader. Right now, a new transactional batch signals the start of a
> >> new
> >> > > transaction, and there is no way for the partition leader to decide
> >> > whether
> >> > > the batch is an out-of-order message.
> >> > > In a naive and wasteful protocol, we could have a unique transaction
> >> id
> >> > > added to each batch and marker, meaning that the leader would be
> >> capable
> >> > of
> >> > > refusing batches which arrive after the control marker of the
> >> > transaction.
> >> > > But instead of changing the log format and the protocol, we can
> >> achieve
> >> > the
> >> > > same by bumping the producer epoch.
> >> > >
> >> > > Bumping the epoch has a similar effect to "changing the transaction
> >> id" -
> >> > > the in-progress transaction will be aborted with a bumped producer
> >> epoch,
> >> > > telling the partition leader about the producer epoch change. From
> >> this
> >> > > point on, any batches sent with the old epoch will be refused by the
> >> > leader
> >> > > due to the fencing mechanism. It doesn't really matter how many
> >> batches
> >> > > will get appended to the log, and how many will be refused - this is
> >> an
> >> > > aborted transaction - but the out-of-order message cannot occur, and
> >> > cannot
> >> > > block the LSO infinitely.
> >> > >
> >> > > My suggestion is, that the TransactionManager inside the producer
> >> should
> >> > > keep track of what type of errors were encountered by the batches of
> >> the
> >> > > transaction, and categorize them along the lines of "definitely
> >> > completed"
> >> > > and "might not be completed". When the transaction goes into an
> >> abortable
> >> > > state, and there is at least one batch with "might not be
> completed",
> >> the
> >> > > EndTxnRequest should be skipped, and an epoch bump should be sent.
> >> > > As for what type of error counts as "might not be completed", I can
> >> only
> >> > > think of client side timeouts.
> >> > >
> >> > > I believe this is a relatively small change (only affects the client
> >> > lib),
> >> > > but it helps in avoiding some corrupt states in Kafka transactions.
> >> > >
> >> > > Looking forward to your input. If it seems like a sane idea, I go
> >> ahead
> >> > and
> >> > > submit a PR for it as well.
> >> > >
> >> > > Thanks in advance,
> >> > > Daniel
> >> > >
> >> >
> >>
> >
>

Reply via email to