What I mean is the following:

For both scenarios, late message or missing addPartitionToTxnRequest, a record r is written to partition X, but X is not registered at the TX-coordinator. Now there are two cases:

(1) A follow up transaction writes more data to the same partition X, and r becomes part of the follow up transaction. This is an error obviously, but we don't get a hanging transaction.

(2) X is not part of any follow up transaction and thus X starts to block consumer reading data.

If we let the partition leader send the addPartitionToTxnRequest to the TX-coordinator, scenario (2) always turns into scenario (1) -- at least, if there is one more transaction for this producer (what I think we can assume). Even if the follow up transaction doesn't write data to X, X still becomes part of the TX and X won't hang and won't block consumers any longer.

We still end up with not fixing (1) though... Your proposal seems to address case (1) in addition to case (2), at least for most cases. There is still the race condition (that we cannot fix without the epoch bump) that r comes in _very_ late, and the follow up transaction would have written more data to X already, and thus X is indeed already registered and r would just be added successfully. Of course, the race condition window is much smaller, so your proposal is much better than what I had in mind.


-Matthias

On 12/14/22 10:43 AM, Justine Olshan wrote:
Matthias — thanks again for taking time to look a this. You said:

My proposal was only focusing to avoid dangling

transactions if records are added without registered partition. -- Maybe

you can add a few more details to the KIP about this scenario for better

documentation purpose?


I'm not sure I understand what you mean here. The motivation section
describes two scenarios about how the record can be added without a
registered partition:


This can happen when a message gets stuck or delayed due to networking
issues or a network partition, the transaction aborts, and then the delayed
message finally comes in.


Another way hanging transactions can occur is that a client is buggy and
may somehow try to write to a partition before it adds the partition to the
transaction.



For the first example of this would it be helpful to say that this message
comes in after the abort, but before the partition is added to the next
transaction so it becomes "hanging." Perhaps the next sentence describing
the message becoming part of the next transaction (a different case) was
not properly differentiated.



Jun — thanks for reading the KIP.

70. The int typing was a concern. Currently we have a mechanism in place to
fence the final epoch when the epoch is about to overflow and assign a new
producer ID with epoch 0. Of course, this is a bit tricky when it comes to
the response back to the client.
Making this a long could be another option, but I wonder are there any
implications on changing this field if the epoch is persisted to disk? I'd
need to check the usages.

71.This was something Matthias asked about as well. I was considering a
possible edge case where a produce request from a new transaction somehow
gets sent right after the marker is written, but before the producer is
alerted of the newly bumped epoch. In this case, we may include this record
when we don't want to. I suppose we could try to do something client side
to bump the epoch after sending an endTxn as well in this scenario — but I
wonder how it would work when the server is aborting based on a server-side
error. I could also be missing something and this scenario is actually not
possible.

Thanks again to everyone reading and commenting. Let me know about any
further questions or comments.

Justine

On Wed, Dec 14, 2022 at 9:41 AM Jun Rao <j...@confluent.io.invalid> wrote:

Hi, Justine,

Thanks for the KIP. A couple of comments.

70. Currently, the producer epoch is an int. I am not sure if it's enough
to accommodate all transactions in the lifetime of a producer. Should we
change that to a long or add a new long field like txnId?

71. "it will write the prepare commit message with a bumped epoch and send
WriteTxnMarkerRequests with the bumped epoch." Hmm, the epoch is associated
with the current txn right? So, it seems weird to write a commit message
with a bumped epoch. Should we only bump up the epoch in EndTxnResponse and
rename the field to sth like nextProducerEpoch?

Thanks,

Jun



On Mon, Dec 12, 2022 at 8:54 PM Matthias J. Sax <mj...@apache.org> wrote:

Thanks for the background.

20/30: SGTM. My proposal was only focusing to avoid dangling
transactions if records are added without registered partition. -- Maybe
you can add a few more details to the KIP about this scenario for better
documentation purpose?

40: I think you hit a fair point about race conditions or client bugs
(incorrectly not bumping the epoch). The complexity/confusion for using
the bumped epoch I see, is mainly for internal debugging, ie, inspecting
log segment dumps -- it seems harder to reason about the system for us
humans. But if we get better guarantees, it would be worth to use the
bumped epoch.

60: as I mentioned already, I don't know the broker internals to provide
more input. So if nobody else chimes in, we should just move forward
with your proposal.


-Matthias


On 12/6/22 4:22 PM, Justine Olshan wrote:
Hi all,
After Artem's questions about error behavior, I've re-evaluated the
unknown producer ID exception and had some discussions offline.

I think generally it makes sense to simplify error handling in cases
like
this and the UNKNOWN_PRODUCER_ID error has a pretty long and
complicated
history. Because of this, I propose adding a new error code
ABORTABLE_ERROR
that when encountered by new clients (gated by the produce request
version)
will simply abort the transaction. This allows the server to have some
say
in whether the client aborts and makes handling much simpler. In the
future, we can also use this error in other situations where we want to
abort the transactions. We can even use on other apis.

I've added this to the KIP. Let me know if there are any questions or
issues.

Justine

On Fri, Dec 2, 2022 at 10:22 AM Justine Olshan <jols...@confluent.io>
wrote:

Hey Matthias,


20/30 — Maybe I also didn't express myself clearly. For older clients
we
don't have a way to distinguish between a previous and the current
transaction since we don't have the epoch bump. This means that a late
message from the previous transaction may be added to the new one.
With
older clients — we can't guarantee this won't happen if we already
sent
the
addPartitionsToTxn call (why we make changes for the newer client) but
we
can at least gate some by ensuring that the partition has been added
to
the
transaction. The rationale here is that there are likely LESS late
arrivals
as time goes on, so hopefully most late arrivals will come in BEFORE
the
addPartitionsToTxn call. Those that arrive before will be properly
gated
with the describeTransactions approach.

If we take the approach you suggested, ANY late arrival from a
previous
transaction will be added. And we don't want that. I also don't see
any
benefit in sending addPartitionsToTxn over the describeTxns call. They
will
both be one extra RPC to the Txn coordinator.


To be clear — newer clients will use addPartitionsToTxn instead of the
DescribeTxns.


40)
My concern is that if we have some delay in the client to bump the
epoch,
it could continue to send epoch 73 and those records would not be
fenced.
Perhaps this is not an issue if we don't allow the next produce to go
through before the EndTxn request returns. I'm also thinking about
cases of
failure. I will need to think on this a bit.

I wasn't sure if it was that confusing. But if we think it is, we can
investigate other ways.


60)

I'm not sure these are the same purgatories since one is a produce
purgatory (I was planning on using a callback rather than purgatory)
and
the other is simply a request to append to the log. Not sure we have
any
structure here for ordering, but my understanding is that the broker
could
handle the write request before it hears back from the Txn
Coordinator.

Let me know if I misunderstood something or something was unclear.

Justine

On Thu, Dec 1, 2022 at 12:15 PM Matthias J. Sax <mj...@apache.org>
wrote:

Thanks for the details Justine!

20)

The client side change for 2 is removing the addPartitions to
transaction
call. We don't need to make this from the producer to the txn
coordinator,
only server side.

I think I did not express myself clearly. I understand that we can
(and
should) change the producer to not send the `addPartitions` request
any
longer. But I don't thinks it's requirement to change the broker?

What I am trying to say is: as a safe-guard and improvement for older
producers, the partition leader can just send the `addPartitions`
request to the TX-coordinator in any case -- if the old producer
correctly did send the `addPartition` request to the TX-coordinator
already, the TX-coordinator can just "ignore" is as idempotent.
However,
if the old producer has a bug and did forget to sent the
`addPartition`
request, we would now ensure that the partition is indeed added to
the
TX and thus fix a potential producer bug (even if we don't get the
fencing via the bump epoch). -- It seems to be a good improvement? Or
is
there a reason to not do this?



30)

Transaction is ongoing = partition was added to transaction via
addPartitionsToTxn. We check this with the DescribeTransactions
call.
Let
me know if this wasn't sufficiently explained here:

If we do what I propose in (20), we don't really need to make this
`DescribeTransaction` call, as the partition leader adds the
partition
for older clients and we get this check for free.


40)

The idea here is that if any messages somehow come in before we get
the
new
epoch to the producer, they will be fenced. However, if we don't
think
this
is necessary, it can be discussed

I agree that we should have epoch fencing. My question is different:
Assume we are at epoch 73, and we have an ongoing transaction, that
is
committed. It seems natural to write the "prepare commit" marker and
the
`WriteTxMarkerRequest` both with epoch 73, too, as it belongs to the
current transaction. Of course, we now also bump the epoch and expect
the next requests to have epoch 74, and would reject an request with
epoch 73, as the corresponding TX for epoch 73 was already committed.

It seems you propose to write the "prepare commit marker" and
`WriteTxMarkerRequest` with epoch 74 though, what would work, but it
seems confusing. Is there a reason why we would use the bumped epoch
74
instead of the current epoch 73?


60)

When we are checking if the transaction is ongoing, we need to make
a
round
trip from the leader partition to the transaction coordinator. In
the
time
we are waiting for this message to come back, in theory we could
have
sent
a commit/abort call that would make the original result of the check
out of
date. That is why we can check the leader state before we write to
the
log.

Thanks. Got it.

However, is this really an issue? We put the produce request in
purgatory, so how could we process the `WriteTxnMarkerRequest` first?
Don't we need to put the `WriteTxnMarkerRequest` into purgatory, too,
for this case, and process both request in-order? (Again, my broker
knowledge is limited and maybe we don't maintain request order for
this
case, what seems to be an issue IMHO, and I am wondering if changing
request handling to preserve order for this case might be the cleaner
solution?)



-Matthias




On 11/30/22 3:28 PM, Artem Livshits wrote:
Hi Justine,

I think the interesting part is not in this logic (because it tries
to
figure out when UNKNOWN_PRODUCER_ID is retriable and if it's
retryable,
it's definitely not fatal), but what happens when this logic doesn't
return
'true' and falls through.  In the old clients it seems to be fatal,
if
we
keep the behavior in the new clients, I'd expect it would be fatal
as
well.

-Artem

On Tue, Nov 29, 2022 at 11:57 AM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Hi Artem and Jeff,


Thanks for taking a look and sorry for the slow response.

You both mentioned the change to handle UNKNOWN_PRODUCER_ID errors.
To
be
clear — this error code will only be sent again when the client's
request
version is high enough to ensure we handle it correctly.
The current (Java) client handles this by the following (somewhat
long)
code snippet:

// An UNKNOWN_PRODUCER_ID means that we have lost the producer
state
on the
broker. Depending on the log start

// offset, we may want to retry these, as described for each case
below. If
none of those apply, then for the

// idempotent producer, we will locally bump the epoch and reset
the
sequence numbers of in-flight batches from

// sequence 0, then retry the failed batch, which should now
succeed.
For
the transactional producer, allow the

// batch to fail. When processing the failed batch, we will
transition
to
an abortable error and set a flag

// indicating that we need to bump the epoch (if supported by the
broker).

if (error == Errors.*UNKNOWN_PRODUCER_ID*) {

       if (response.logStartOffset == -1) {

           // We don't know the log start offset with this response.
We
should
just retry the request until we get it.

           // The UNKNOWN_PRODUCER_ID error code was added along
with
the new
ProduceResponse which includes the

           // logStartOffset. So the '-1' sentinel is not for
backward
compatibility. Instead, it is possible for

           // a broker to not know the logStartOffset at when it is
returning
the response because the partition

           // may have moved away from the broker from the time the
error was
initially raised to the time the

           // response was being constructed. In these cases, we
should
just
retry the request: we are guaranteed

           // to eventually get a logStartOffset once things settle
down.

           return true;

       }


       if (batch.sequenceHasBeenReset()) {

           // When the first inflight batch fails due to the
truncation
case,
then the sequences of all the other

           // in flight batches would have been restarted from the
beginning.
However, when those responses

           // come back from the broker, they would also come with
an
UNKNOWN_PRODUCER_ID error. In this case, we should not

           // reset the sequence numbers to the beginning.

           return true;

       } else if (lastAckedOffset(batch.topicPartition).orElse(
*NO_LAST_ACKED_SEQUENCE_NUMBER*) < response.logStartOffset) {

           // The head of the log has been removed, probably due to
the
retention time elapsing. In this case,

           // we expect to lose the producer state. For the
transactional
producer, reset the sequences of all

           // inflight batches to be from the beginning and retry
them,
so
that the transaction does not need to

           // be aborted. For the idempotent producer, bump the
epoch
to
avoid
reusing (sequence, epoch) pairs

           if (isTransactional()) {


txnPartitionMap.startSequencesAtBeginning(batch.topicPartition,
this.producerIdAndEpoch);

           } else {

               requestEpochBumpForPartition(batch.topicPartition);

           }

           return true;

       }


       if (!isTransactional()) {

           // For the idempotent producer, always retry
UNKNOWN_PRODUCER_ID
errors. If the batch has the current

           // producer ID and epoch, request a bump of the epoch.
Otherwise
just retry the produce.

           requestEpochBumpForPartition(batch.topicPartition);

           return true;

       }

}


I was considering keeping this behavior — but am open to
simplifying
it.



We are leaving changes to older clients off the table here since it
caused
many issues for clients in the past. Previously this was a fatal
error
and
we didn't have the mechanisms in place to detect when this was a
legitimate
case vs some bug or gap in the protocol. Ensuring each transaction
has
its
own epoch should close this gap.




And to address Jeff's second point:
*does the typical produce request path append records to local log
along*

*with the currentTxnFirstOffset information? I would like to
understand*

*when the field is written to disk.*


Yes, the first produce request populates this field and writes the
offset
as part of the record batch and also to the producer state
snapshot.
When
we reload the records on restart and/or reassignment, we repopulate
this
field with the snapshot from disk along with the rest of the
producer
state.

Let me know if there are further comments and/or questions.

Thanks,
Justine

On Tue, Nov 22, 2022 at 9:00 PM Jeff Kim
<jeff....@confluent.io.invalid

wrote:

Hi Justine,

Thanks for the KIP! I have two questions:

1) For new clients, we can once again return an error
UNKNOWN_PRODUCER_ID
for sequences
that are non-zero when there is no producer state present on the
server.
This will indicate we missed the 0 sequence and we don't yet want
to
write
to the log.

I would like to understand the current behavior to handle older
clients,
and if there are any changes we are making. Maybe I'm missing
something,
but we would want to identify whether we missed the 0 sequence for
older
clients, no?

2) Upon returning from the transaction coordinator, we can set the
transaction
as ongoing on the leader by populating currentTxnFirstOffset
through the typical produce request handling.

does the typical produce request path append records to local log
along
with the currentTxnFirstOffset information? I would like to
understand
when the field is written to disk.

Thanks,
Jeff


On Tue, Nov 22, 2022 at 4:44 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

Hi Justine,

Thank you for the KIP.  I have one question.

5) For new clients, we can once again return an error
UNKNOWN_PRODUCER_ID

I believe we had problems in the past with returning
UNKNOWN_PRODUCER_ID
because it was considered fatal and required client restart.  It
would
be
good to spell out the new client behavior when it receives the
error.

-Artem

On Tue, Nov 22, 2022 at 10:00 AM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Thanks for taking a look Matthias. I've tried to answer your
questions
below:

10)

Right — so the hanging transaction only occurs when we have a
late
message
come in and the partition is never added to a transaction again.
If
we
never add the partition to a transaction, we will never write a
marker
and
never advance the LSO.

If we do end up adding the partition to the transaction (I
suppose
this
can
happen before or after the late message comes in) then we will
include
the
late message in the next (incorrect) transaction.

So perhaps it is clearer to make the distinction between
messages
that
eventually get added to the transaction (but the wrong one) or
messages
that never get added and become hanging.


20)

The client side change for 2 is removing the addPartitions to
transaction
call. We don't need to make this from the producer to the txn
coordinator,
only server side.


In my opinion, the issue with the addPartitionsToTxn call for
older
clients
is that we don't have the epoch bump, so we don't know if the
message
belongs to the previous transaction or this one. We need to
check
if
the
partition has been added to this transaction. Of course, this
means
we
won't completely cover the case where we have a really late
message
and
we
have added the partition to the new transaction, but that's
unfortunately
something we will need the new clients to cover.


30)

Transaction is ongoing = partition was added to transaction via
addPartitionsToTxn. We check this with the DescribeTransactions
call.
Let
me know if this wasn't sufficiently explained here:







https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3)


40)

The idea here is that if any messages somehow come in before we
get
the
new
epoch to the producer, they will be fenced. However, if we don't
think
this
is necessary, it can be discussed


50)

It should be synchronous because if we have an event (ie, an
error)
that
causes us to need to abort the transaction, we need to know
which
partitions to send transaction markers to. We know the
partitions
because
we added them to the coordinator via the addPartitionsToTxn
call.
Previously we have had asynchronous calls in the past (ie,
writing
the
commit markers when the transaction is completed) but often this
just
causes confusion as we need to wait for some operations to
complete.
In
the
writing commit markers case, clients often see
CONCURRENT_TRANSACTIONs
error messages and that can be confusing. For that reason, it
may
be
simpler to just have synchronous calls — especially if we need
to
block
on
some operation's completion anyway before we can start the next
transaction. And yes, I meant coordinator. I will fix that.


60)

When we are checking if the transaction is ongoing, we need to
make
a
round
trip from the leader partition to the transaction coordinator.
In
the
time
we are waiting for this message to come back, in theory we could
have
sent
a commit/abort call that would make the original result of the
check
out
of
date. That is why we can check the leader state before we write
to
the
log.


I'm happy to update the KIP if some of these things were not
clear.
Thanks,
Justine

On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax <
mj...@apache.org

wrote:

Thanks for the KIP.

Couple of clarification questions (I am not a broker expert do
maybe
some question are obvious for others, but not for me with my
lack
of
broker knowledge).



(10)

The delayed message case can also violate EOS if the delayed
message
comes in after the next addPartitionsToTxn request comes in.
Effectively
we
may see a message from a previous (aborted) transaction become
part
of
the
next transaction.

What happens if the message come in before the next
addPartitionsToTxn
request? It seems the broker hosting the data partitions won't
know
anything about it and append it to the partition, too? What is
the
difference between both cases?

Also, it seems a TX would only hang, if there is no following
TX
that
is
either committer or aborted? Thus, for the case above, the TX
might
actually not hang (of course, we might get an EOS violation if
the
first
TX was aborted and the second committed, or the other way
around).


(20)

Of course, 1 and 2 require client-side changes, so for older
clients,
those approaches won’t apply.

For (1) I understand why a client change is necessary, but not
sure
why
we need a client change for (2). Can you elaborate? -- Later
you
explain
that we should send a DescribeTransactionRequest, but I am not
sure
why?
Can't we not just do an implicit AddPartiitonToTx, too? If the
old
producer correctly registered the partition already, the
TX-coordinator
can just ignore it as it's an idempotent operation?


(30)

To cover older clients, we will ensure a transaction is
ongoing
before
we write to a transaction

Not sure what you mean by this? Can you elaborate?


(40)

[the TX-coordinator] will write the prepare commit message
with
a
bumped
epoch and send WriteTxnMarkerRequests with the bumped epoch.

Why do we use the bumped epoch for both? It seems more
intuitive
to
use
the current epoch, and only return the bumped epoch to the
producer?


(50) "Implicit AddPartitionToTransaction"

Why does the implicitly sent request need to be synchronous?
The
KIP
also says

in case we need to abort and need to know which partitions

What do you mean by this?


we don’t want to write to it before we store in the
transaction
manager

Do you mean TX-coordinator instead of "manager"?


(60)

For older clients and ensuring that the TX is ongoing, you
describe a
race condition. I am not sure if I can follow here. Can you
elaborate?



-Matthias



On 11/18/22 1:21 PM, Justine Olshan wrote:
Hey all!

I'd like to start a discussion on my proposal to add some
server-side
checks on transactions to avoid hanging transactions. I know
this
has
been
an issue for some time, so I really hope this KIP will be
helpful
for
many
users of EOS.

The KIP includes changes that will be compatible with old
clients
and
changes to improve performance and correctness on new clients.

Please take a look and leave any comments you may have!

KIP:








https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
JIRA: https://issues.apache.org/jira/browse/KAFKA-14402

Thanks!
Justine













Reply via email to