Hi Jun,

Thank you for the comments.

> 10. For the two new fields in Enable2Pc and KeepPreparedTxn ...

I added a note that all combinations are valid.  Enable2Pc=false &
KeepPreparedTxn=true could be potentially useful for backward compatibility
with Flink, when the new version of Flink that implements KIP-319 tries to
work with a cluster that doesn't authorize 2PC.

> 11.  InitProducerIdResponse: If there is no ongoing txn, what will
OngoingTxnProducerId and OngoingTxnEpoch be set?

I added a note that they will be set to -1.  The client then will know that
there is no ongoing txn and .completeTransaction becomes a no-op (but still
required before .send is enabled).

> 12. ListTransactionsRequest related changes: It seems those are already
covered by KIP-994?

Removed from this KIP.

> 13. TransactionalLogValue ...

This is now updated to work on top of KIP-890.

> 14. "Note that the (producerId, epoch) pair that corresponds to the
ongoing transaction ...

This is now updated to work on top of KIP-890.

> 15. active-transaction-total-time-max : ...

Updated.

> 16. "transaction.two.phase.commit.enable The default would be ‘false’.
If it’s ‘false’, 2PC functionality is disabled even if the ACL is set ...

Disabling 2PC effectively removes all authorization to use it, hence I
thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be appropriate.

Do you suggest using a different error code for 2PC authorization vs some
other authorization (e.g. TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED) or a
different code for disabled vs. unauthorised (e.g.
TWO_PHASE_COMMIT_DISABLED) or both?

> 17. completeTransaction(). We expect this to be only used during recovery.

It can also be used if, say, a commit to the database fails and the result
is inconclusive, e.g.

1. Begin DB transaction
2. Begin Kafka transaction
3. Prepare Kafka transaction
4. Commit DB transaction
5. The DB commit fails, figure out the state of the transaction by reading
the PreparedTxnState from DB
6. Complete Kafka transaction with the PreparedTxnState.

> 18. "either prepareTransaction was called or initTransaction(true) was
called": "either" should be "neither"?

Updated.

> 19. Since InitProducerId always bumps up the epoch, it creates a
situation ...

InitProducerId only bumps the producer epoch, the ongoing transaction epoch
stays the same, no matter how many times the InitProducerId is called
before the transaction is completed.  Eventually the epoch may overflow,
and then a new producer id would be allocated, but the ongoing transaction
producer id would stay the same.

I've added a couple examples in the KIP (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges)
that walk through some scenarios and show how the state is changed.

-Artem

On Fri, Dec 8, 2023 at 6:04 PM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the KIP. A few comments below.
>
> 10. For the two new fields in Enable2Pc and KeepPreparedTxn in
> InitProducerId, it would be useful to document a bit more detail on what
> values are set under what cases. For example, are all four combinations
> valid?
>
> 11.  InitProducerIdResponse: If there is no ongoing txn, what will
> OngoingTxnProducerId and OngoingTxnEpoch be set?
>
> 12. ListTransactionsRequest related changes: It seems those are already
> covered by KIP-994?
>
> 13. TransactionalLogValue: Could we name TransactionProducerId and
> ProducerId better? It's not clear from the name which is for which.
>
> 14. "Note that the (producerId, epoch) pair that corresponds to the ongoing
> transaction is going to be written instead of the existing ProducerId and
> ProducerEpoch fields (which are renamed to reflect the semantics) to
> support downgrade.": I am a bit confused on that. Are we writing different
> values to the existing fields? Then, we can't downgrade, right?
>
> 15. active-transaction-total-time-max : Would
> active-transaction-open-time-max be more intuitive? Also, could we include
> the full name (group, tags, etc)?
>
> 16. "transaction.two.phase.commit.enable The default would be ‘false’.  If
> it’s ‘false’, 2PC functionality is disabled even if the ACL is set, clients
> that attempt to use this functionality would receive
> TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive for the client to
> understand what the actual cause is.
>
> 17. completeTransaction(). We expect this to be only used during recovery.
> Could we document this clearly? Could we prevent it from being used
> incorrectly (e.g. throw an exception if the producer has called other
> methods like send())?
>
> 18. "either prepareTransaction was called or initTransaction(true) was
> called": "either" should be "neither"?
>
> 19. Since InitProducerId always bumps up the epoch, it creates a situation
> where there could be multiple outstanding txns. The following is an example
> of a potential problem during recovery.
>    The last txn epoch in the external store is 41 when the app dies.
>    Instance1 is created for recovery.
>      1. (instance1) InitProducerId(keepPreparedTxn=true), epoch=42,
> ongoingEpoch=41
>      2. (instance1) dies before completeTxn(41) can be called.
>    Instance2 is created for recovery.
>      3. (instance2) InitProducerId(keepPreparedTxn=true), epoch=43,
> ongoingEpoch=42
>      4. (instance2) completeTxn(41) => abort
>    The first problem is that 41 now is aborted when it should be committed.
> The second one is that it's not clear who could abort epoch 42, which is
> still open.
>
> Jun
>
>
> On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan <jols...@confluent.io.invalid
> >
> wrote:
>
> > Hey Artem,
> >
> > Thanks for the updates. I think what you say makes sense. I just updated
> my
> > KIP so I want to reconcile some of the changes we made especially with
> > respect to the TransactionLogValue.
> >
> > Firstly, I believe tagged fields require a default value so that if they
> > are not filled, we return the default (and know that they were empty).
> For
> > my KIP, I proposed the default for producer ID tagged fields should be
> -1.
> > I was wondering if we could update the KIP to include the default values
> > for producer ID and epoch.
> >
> > Next, I noticed we decided to rename the fields. I guess that the field
> > "NextProducerId" in my KIP correlates to "ProducerId" in this KIP. Is
> that
> > correct? So we would have "TransactionProducerId" for the non-tagged
> field
> > and have "ProducerId" (NextProducerId) and "PrevProducerId" as tagged
> > fields the final version after KIP-890 and KIP-936 are implemented. Is
> this
> > correct? I think the tags will need updating, but that is trivial.
> >
> > The final question I had was with respect to storing the new epoch. In
> > KIP-890 part 2 (epoch bumps) I think we concluded that we don't need to
> > store the epoch since we can interpret the previous epoch based on the
> > producer ID. But here we could call the InitProducerId multiple times and
> > we only want the producer with the correct epoch to be able to commit the
> > transaction. Is that the correct reasoning for why we need epoch here but
> > not the Prepare/Commit state.
> >
> > Thanks,
> > Justine
> >
> > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > Hi Justine,
> > >
> > > After thinking a bit about supporting atomic dual writes for Kafka +
> > NoSQL
> > > database, I came to a conclusion that we do need to bump the epoch even
> > > with InitProducerId(keepPreparedTxn=true).  As I described in my
> previous
> > > email, we wouldn't need to bump the epoch to protect from zombies so
> that
> > > reasoning is still true.  But we cannot protect from split-brain
> > scenarios
> > > when two or more instances of a producer with the same transactional id
> > try
> > > to produce at the same time.  The dual-write example for SQL databases
> (
> > > https://github.com/apache/kafka/pull/14231/files) doesn't have a
> > > split-brain problem because execution is protected by the update lock
> on
> > > the transaction state record; however NoSQL databases may not have this
> > > protection (I'll write an example for NoSQL database dual-write soon).
> > >
> > > In a nutshell, here is an example of a split-brain scenario:
> > >
> > >    1. (instance1) InitProducerId(keepPreparedTxn=true), got epoch=42
> > >    2. (instance2) InitProducerId(keepPreparedTxn=true), got epoch=42
> > >    3. (instance1) CommitTxn, epoch bumped to 43
> > >    4. (instance2) CommitTxn, this is considered a retry, so it got
> epoch
> > 43
> > >    as well
> > >    5. (instance1) Produce messageA w/sequence 1
> > >    6. (instance2) Produce messageB w/sequence 1, this is considered a
> > >    duplicate
> > >    7. (instance2) Produce messageC w/sequence 2
> > >    8. (instance1) Produce messageD w/sequence 2, this is considered a
> > >    duplicate
> > >
> > > Now if either of those commit the transaction, it would have a mix of
> > > messages from the two instances (messageA and messageC).  With the
> proper
> > > epoch bump, instance1 would get fenced at step 3.
> > >
> > > In order to update epoch in InitProducerId(keepPreparedTxn=true) we
> need
> > to
> > > preserve the ongoing transaction's epoch (and producerId, if the epoch
> > > overflows), because we'd need to make a correct decision when we
> compare
> > > the PreparedTxnState that we read from the database with the
> (producerId,
> > > epoch) of the ongoing transaction.
> > >
> > > I've updated the KIP with the following:
> > >
> > >    - Ongoing transaction now has 2 (producerId, epoch) pairs -- one
> pair
> > >    describes the ongoing transaction, the other pair describes expected
> > > epoch
> > >    for operations on this transactional id
> > >    - InitProducerIdResponse now returns 2 (producerId, epoch) pairs
> > >    - TransactionalLogValue now has 2 (producerId, epoch) pairs, the new
> > >    values added as tagged fields, so it's easy to downgrade
> > >    - Added a note about downgrade in the Compatibility section
> > >    - Added a rejected alternative
> > >
> > > -Artem
> > >
> > > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <alivsh...@confluent.io>
> > > wrote:
> > >
> > > > Hi Justine,
> > > >
> > > > Thank you for the questions.  Currently (pre-KIP-939) we always bump
> > the
> > > > epoch on InitProducerId and abort an ongoing transaction (if any).  I
> > > > expect this behavior will continue with KIP-890 as well.
> > > >
> > > > With KIP-939 we need to support the case when the ongoing transaction
> > > > needs to be preserved when keepPreparedTxn=true.  Bumping epoch
> without
> > > > aborting or committing a transaction is tricky because epoch is a
> short
> > > > value and it's easy to overflow.  Currently, the overflow case is
> > handled
> > > > by aborting the ongoing transaction, which would send out transaction
> > > > markers with epoch=Short.MAX_VALUE to the partition leaders, which
> > would
> > > > fence off any messages with the producer id that started the
> > transaction
> > > > (they would have epoch that is less than Short.MAX_VALUE).  Then it
> is
> > > safe
> > > > to allocate a new producer id and use it in new transactions.
> > > >
> > > > We could say that maybe when keepPreparedTxn=true we bump epoch
> unless
> > it
> > > > leads to overflow, and don't bump epoch in the overflow case.  I
> don't
> > > > think it's a good solution because if it's not safe to keep the same
> > > epoch
> > > > when keepPreparedTxn=true, then we must handle the epoch overflow
> case
> > as
> > > > well.  So either we should convince ourselves that it's safe to keep
> > the
> > > > epoch and do it in the general case, or we always bump the epoch and
> > > handle
> > > > the overflow.
> > > >
> > > > With KIP-890, we bump the epoch on every transaction commit / abort.
> > > This
> > > > guarantees that even if InitProducerId(keepPreparedTxn=true) doesn't
> > > > increment epoch on the ongoing transaction, the client will have to
> > call
> > > > commit or abort to finish the transaction and will increment the
> epoch
> > > (and
> > > > handle epoch overflow, if needed).  If the ongoing transaction was
> in a
> > > bad
> > > > state and had some zombies waiting to arrive, the abort operation
> would
> > > > fence them because with KIP-890 every abort would bump the epoch.
> > > >
> > > > We could also look at this from the following perspective.  With
> > KIP-890,
> > > > zombies won't be able to cross transaction boundaries; each
> transaction
> > > > completion creates a boundary and any activity in the past gets
> > confined
> > > in
> > > > the boundary.  Then data in any partition would look like this:
> > > >
> > > > 1. message1, epoch=42
> > > > 2. message2, epoch=42
> > > > 3. message3, epoch=42
> > > > 4. marker (commit or abort), epoch=43
> > > >
> > > > Now if we inject steps 3a and 3b like this:
> > > >
> > > > 1. message1, epoch=42
> > > > 2. message2, epoch=42
> > > > 3. message3, epoch=42
> > > > 3a. crash
> > > > 3b. InitProducerId(keepPreparedTxn=true)
> > > > 4. marker (commit or abort), epoch=43
> > > >
> > > > The invariant still holds even with steps 3a and 3b -- whatever
> > activity
> > > > was in the past will get confined in the past with mandatory abort /
> > > commit
> > > > that must follow  InitProducerId(keepPreparedTxn=true).
> > > >
> > > > So KIP-890 provides the proper isolation between transactions, so
> > > > injecting crash + InitProducerId(keepPreparedTxn=true) into the
> > > > transaction sequence is safe from the zombie protection perspective.
> > > >
> > > > That said, I'm still thinking about it and looking for cases that
> might
> > > > break because we don't bump epoch when
> > > > InitProducerId(keepPreparedTxn=true), if such cases exist, we'll need
> > to
> > > > develop the logic to handle epoch overflow for ongoing transactions.
> > > >
> > > > -Artem
> > > >
> > > >
> > > >
> > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan
> > > > <jols...@confluent.io.invalid> wrote:
> > > >
> > > >> Hey Artem,
> > > >>
> > > >> Thanks for the KIP. I had a question about epoch bumping.
> > > >>
> > > >> Previously when we send an InitProducerId request on Producer
> startup,
> > > we
> > > >> bump the epoch and abort the transaction. Is it correct to assume
> that
> > > we
> > > >> will still bump the epoch, but just not abort the transaction?
> > > >> If we still bump the epoch in this case, how does this interact with
> > > >> KIP-890 where we also bump the epoch on every transaction. (I think
> > this
> > > >> means that we may skip epochs and the data itself will all have the
> > same
> > > >> epoch)
> > > >>
> > > >> I may have follow ups depending on the answer to this. :)
> > > >>
> > > >> Thanks,
> > > >> Justine
> > > >>
> > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
> > > >> <alivsh...@confluent.io.invalid> wrote:
> > > >>
> > > >> > Hi Alex,
> > > >> >
> > > >> > Thank you for your questions.
> > > >> >
> > > >> > > the purpose of having broker-level
> > > transaction.two.phase.commit.enable
> > > >> >
> > > >> > The thinking is that 2PC is a bit of an advanced construct so
> > enabling
> > > >> 2PC
> > > >> > in a Kafka cluster should be an explicit decision.  If it is set
> to
> > > >> 'false'
> > > >> > InitiProducerId (and initTransactions) would
> > > >> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> > > >> >
> > > >> > > WDYT about adding an AdminClient method that returns the state
> of
> > > >> > transaction.two.phase.commit.enable
> > > >> >
> > > >> > I wonder if the client could just try to use 2PC and then handle
> the
> > > >> error
> > > >> > (e.g. if it needs to fall back to ordinary transactions).  This
> way
> > it
> > > >> > could uniformly handle cases when Kafka cluster doesn't support
> 2PC
> > > >> > completely and cases when 2PC is restricted to certain users.  We
> > > could
> > > >> > also expose this config in describeConfigs, if the fallback
> approach
> > > >> > doesn't work for some scenarios.
> > > >> >
> > > >> > -Artem
> > > >> >
> > > >> >
> > > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
> > > >> > <asorokou...@confluent.io.invalid> wrote:
> > > >> >
> > > >> > > Hi Artem,
> > > >> > >
> > > >> > > Thanks for publishing this KIP!
> > > >> > >
> > > >> > > Can you please clarify the purpose of having broker-level
> > > >> > > transaction.two.phase.commit.enable config in addition to the
> new
> > > >> ACL? If
> > > >> > > the brokers are configured with
> > > >> > transaction.two.phase.commit.enable=false,
> > > >> > > at what point will a client configured with
> > > >> > > transaction.two.phase.commit.enable=true fail? Will it happen at
> > > >> > > KafkaProducer#initTransactions?
> > > >> > >
> > > >> > > WDYT about adding an AdminClient method that returns the state
> of
> > t
> > > >> > > ransaction.two.phase.commit.enable? This way, clients would know
> > in
> > > >> > advance
> > > >> > > if 2PC is enabled on the brokers.
> > > >> > >
> > > >> > > Best,
> > > >> > > Alex
> > > >> > >
> > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <
> > > roger.hoo...@gmail.com>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Other than supporting multiplexing transactional streams on a
> > > single
> > > >> > > > producer, I don't see how to improve it.
> > > >> > > >
> > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
> > > >> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >> > > >
> > > >> > > > > Hi Roger,
> > > >> > > > >
> > > >> > > > > Thank you for summarizing the cons.  I agree and I'm curious
> > > what
> > > >> > would
> > > >> > > > be
> > > >> > > > > the alternatives to solve these problems better and if they
> > can
> > > be
> > > >> > > > > incorporated into this proposal (or built independently in
> > > >> addition
> > > >> > to
> > > >> > > or
> > > >> > > > > on top of this proposal).  E.g. one potential extension we
> > > >> discussed
> > > >> > > > > earlier in the thread could be multiplexing logical
> > > transactional
> > > >> > > > "streams"
> > > >> > > > > with a single producer.
> > > >> > > > >
> > > >> > > > > -Artem
> > > >> > > > >
> > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <
> > > >> roger.hoo...@gmail.com
> > > >> > >
> > > >> > > > > wrote:
> > > >> > > > >
> > > >> > > > > > Thanks.  I like that you're moving Kafka toward supporting
> > > this
> > > >> > > > > dual-write
> > > >> > > > > > pattern.  Each use case needs to consider the tradeoffs.
> > You
> > > >> > already
> > > >> > > > > > summarized the pros very well in the KIP.  I would
> summarize
> > > the
> > > >> > cons
> > > >> > > > > > as follows:
> > > >> > > > > >
> > > >> > > > > > - you sacrifice availability - each write requires both DB
> > and
> > > >> > Kafka
> > > >> > > to
> > > >> > > > > be
> > > >> > > > > > available so I think your overall application availability
> > is
> > > 1
> > > >> -
> > > >> > > p(DB
> > > >> > > > is
> > > >> > > > > > unavailable)*p(Kafka is unavailable).
> > > >> > > > > > - latency will be higher and throughput lower - each write
> > > >> requires
> > > >> > > > both
> > > >> > > > > > writes to DB and Kafka while holding an exclusive lock in
> > DB.
> > > >> > > > > > - you need to create a producer per unit of concurrency in
> > > your
> > > >> app
> > > >> > > > which
> > > >> > > > > > has some overhead in the app and Kafka side (number of
> > > >> connections,
> > > >> > > > poor
> > > >> > > > > > batching).  I assume the producers would need to be
> > configured
> > > >> for
> > > >> > > low
> > > >> > > > > > latency (linger.ms=0)
> > > >> > > > > > - there's some complexity in managing stable transactional
> > ids
> > > >> for
> > > >> > > each
> > > >> > > > > > producer/concurrency unit in your application.  With k8s
> > > >> > deployment,
> > > >> > > > you
> > > >> > > > > > may need to switch to something like a StatefulSet that
> > gives
> > > >> each
> > > >> > > pod
> > > >> > > > a
> > > >> > > > > > stable identity across restarts.  On top of that pod
> > identity
> > > >> which
> > > >> > > you
> > > >> > > > > can
> > > >> > > > > > use as a prefix, you then assign unique transactional ids
> to
> > > >> each
> > > >> > > > > > concurrency unit (thread/goroutine).
> > > >> > > > > >
> > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> > > >> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi Roger,
> > > >> > > > > > >
> > > >> > > > > > > Thank you for the feedback.  You make a very good point
> > that
> > > >> we
> > > >> > > also
> > > >> > > > > > > discussed internally.  Adding support for multiple
> > > concurrent
> > > >> > > > > > > transactions in one producer could be valuable but it
> > seems
> > > to
> > > >> > be a
> > > >> > > > > > fairly
> > > >> > > > > > > large and independent change that would deserve a
> separate
> > > >> KIP.
> > > >> > If
> > > >> > > > > such
> > > >> > > > > > > support is added we could modify 2PC functionality to
> > > >> incorporate
> > > >> > > > that.
> > > >> > > > > > >
> > > >> > > > > > > > Maybe not too bad but a bit of pain to manage these
> ids
> > > >> inside
> > > >> > > each
> > > >> > > > > > > process and across all application processes.
> > > >> > > > > > >
> > > >> > > > > > > I'm not sure if supporting multiple transactions in one
> > > >> producer
> > > >> > > > would
> > > >> > > > > > make
> > > >> > > > > > > id management simpler: we'd need to store a piece of
> data
> > > per
> > > >> > > > > > transaction,
> > > >> > > > > > > so whether it's N producers with a single transaction
> or N
> > > >> > > > transactions
> > > >> > > > > > > with a single producer, it's still roughly the same
> amount
> > > of
> > > >> > data
> > > >> > > to
> > > >> > > > > > > manage.  In fact, managing transactional ids (current
> > > >> proposal)
> > > >> > > might
> > > >> > > > > be
> > > >> > > > > > > easier, because the id is controlled by the application
> > and
> > > it
> > > >> > > knows
> > > >> > > > > how
> > > >> > > > > > to
> > > >> > > > > > > complete the transaction after crash / restart; while a
> > TID
> > > >> would
> > > >> > > be
> > > >> > > > > > > generated by Kafka and that would create a question of
> > > >> starting
> > > >> > > Kafka
> > > >> > > > > > > transaction, but not saving its TID and then crashing,
> > then
> > > >> > > figuring
> > > >> > > > > out
> > > >> > > > > > > which transactions to abort and etc.
> > > >> > > > > > >
> > > >> > > > > > > > 2) creating a separate producer for each concurrency
> > slot
> > > in
> > > >> > the
> > > >> > > > > > > application
> > > >> > > > > > >
> > > >> > > > > > > This is a very valid concern.  Maybe we'd need to have
> > some
> > > >> > > > > multiplexing
> > > >> > > > > > of
> > > >> > > > > > > transactional logical "streams" over the same
> connection.
> > > >> Seems
> > > >> > > > like a
> > > >> > > > > > > separate KIP, though.
> > > >> > > > > > >
> > > >> > > > > > > > Otherwise, it seems you're left with single-threaded
> > model
> > > >> per
> > > >> > > > > > > application process?
> > > >> > > > > > >
> > > >> > > > > > > That's a fair assessment.  Not necessarily exactly
> > > >> > single-threaded
> > > >> > > > per
> > > >> > > > > > > application, but a single producer per thread model
> (i.e.
> > an
> > > >> > > > > application
> > > >> > > > > > > could have a pool of threads + producers to increase
> > > >> > concurrency).
> > > >> > > > > > >
> > > >> > > > > > > -Artem
> > > >> > > > > > >
> > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <
> > > >> > > roger.hoo...@gmail.com
> > > >> > > > >
> > > >> > > > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Artem,
> > > >> > > > > > > >
> > > >> > > > > > > > Thanks for the reply.
> > > >> > > > > > > >
> > > >> > > > > > > > If I understand correctly, Kafka does not support
> > > concurrent
> > > >> > > > > > transactions
> > > >> > > > > > > > from the same producer (transactional id).  I think
> this
> > > >> means
> > > >> > > that
> > > >> > > > > > > > applications that want to support in-process
> concurrency
> > > >> (say
> > > >> > > > > > > thread-level
> > > >> > > > > > > > concurrency with row-level DB locking) would need to
> > > manage
> > > >> > > > separate
> > > >> > > > > > > > transactional ids and producers per thread and then
> > store
> > > >> txn
> > > >> > > state
> > > >> > > > > > > > accordingly.   The potential usability downsides I see
> > are
> > > >> > > > > > > > 1) managing a set of transactional ids for each
> > > application
> > > >> > > process
> > > >> > > > > > that
> > > >> > > > > > > > scales up to it's max concurrency.  Maybe not too bad
> > but
> > > a
> > > >> bit
> > > >> > > of
> > > >> > > > > pain
> > > >> > > > > > > to
> > > >> > > > > > > > manage these ids inside each process and across all
> > > >> application
> > > >> > > > > > > processes.
> > > >> > > > > > > > 2) creating a separate producer for each concurrency
> > slot
> > > in
> > > >> > the
> > > >> > > > > > > > application - this could create a lot more producers
> and
> > > >> > > resultant
> > > >> > > > > > > > connections to Kafka than the typical model of a
> single
> > > >> > producer
> > > >> > > > per
> > > >> > > > > > > > process.
> > > >> > > > > > > >
> > > >> > > > > > > > Otherwise, it seems you're left with single-threaded
> > model
> > > >> per
> > > >> > > > > > > application
> > > >> > > > > > > > process?
> > > >> > > > > > > >
> > > >> > > > > > > > Thanks,
> > > >> > > > > > > >
> > > >> > > > > > > > Roger
> > > >> > > > > > > >
> > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
> > > >> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hi Roger, Arjun,
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thank you for the questions.
> > > >> > > > > > > > > > It looks like the application must have stable
> > > >> > transactional
> > > >> > > > ids
> > > >> > > > > > over
> > > >> > > > > > > > > time?
> > > >> > > > > > > > >
> > > >> > > > > > > > > The transactional id should uniquely identify a
> > producer
> > > >> > > instance
> > > >> > > > > and
> > > >> > > > > > > > needs
> > > >> > > > > > > > > to be stable across the restarts.  If the
> > transactional
> > > >> id is
> > > >> > > not
> > > >> > > > > > > stable
> > > >> > > > > > > > > across restarts, then zombie messages from a
> previous
> > > >> > > incarnation
> > > >> > > > > of
> > > >> > > > > > > the
> > > >> > > > > > > > > producer may violate atomicity.  If there are 2
> > producer
> > > >> > > > instances
> > > >> > > > > > > > > concurrently producing data with the same
> > transactional
> > > >> id,
> > > >> > > they
> > > >> > > > > are
> > > >> > > > > > > > going
> > > >> > > > > > > > > to constantly fence each other and most likely make
> > > >> little or
> > > >> > > no
> > > >> > > > > > > > progress.
> > > >> > > > > > > > >
> > > >> > > > > > > > > The name might be a little bit confusing as it may
> be
> > > >> > mistaken
> > > >> > > > for
> > > >> > > > > a
> > > >> > > > > > > > > transaction id / TID that uniquely identifies every
> > > >> > > transaction.
> > > >> > > > > The
> > > >> > > > > > > > name
> > > >> > > > > > > > > and the semantics were defined in the original
> > > >> > > > > exactly-once-semantics
> > > >> > > > > > > > (EoS)
> > > >> > > > > > > > > proposal (
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > >> > > > > > > > > )
> > > >> > > > > > > > > and KIP-939 just build on top of that.
> > > >> > > > > > > > >
> > > >> > > > > > > > > > I'm curious to understand what happens if the
> > producer
> > > >> > dies,
> > > >> > > > and
> > > >> > > > > > does
> > > >> > > > > > > > not
> > > >> > > > > > > > > come up and recover the pending transaction within
> the
> > > >> > > > transaction
> > > >> > > > > > > > timeout
> > > >> > > > > > > > > interval.
> > > >> > > > > > > > >
> > > >> > > > > > > > > If the producer / application never comes back, the
> > > >> > transaction
> > > >> > > > > will
> > > >> > > > > > > > remain
> > > >> > > > > > > > > in prepared (a.k.a. "in-doubt") state until an
> > operator
> > > >> > > > forcefully
> > > >> > > > > > > > > terminates the transaction.  That's why there is a
> new
> > > >> ACL is
> > > >> > > > > defined
> > > >> > > > > > > in
> > > >> > > > > > > > > this proposal -- this functionality should only
> > provided
> > > >> to
> > > >> > > > > > > applications
> > > >> > > > > > > > > that implement proper recovery logic.
> > > >> > > > > > > > >
> > > >> > > > > > > > > -Artem
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish <
> > > >> > > > > > arjun.sat...@gmail.com>
> > > >> > > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Hello Artem,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thanks for the KIP.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > I have the same question as Roger on concurrent
> > > writes,
> > > >> and
> > > >> > > an
> > > >> > > > > > > > additional
> > > >> > > > > > > > > > one on consumer behavior. Typically, transactions
> > will
> > > >> > > timeout
> > > >> > > > if
> > > >> > > > > > not
> > > >> > > > > > > > > > committed within some time interval. With the
> > proposed
> > > >> > > changes
> > > >> > > > in
> > > >> > > > > > > this
> > > >> > > > > > > > > KIP,
> > > >> > > > > > > > > > consumers cannot consume past the ongoing
> > transaction.
> > > >> I'm
> > > >> > > > > curious
> > > >> > > > > > to
> > > >> > > > > > > > > > understand what happens if the producer dies, and
> > does
> > > >> not
> > > >> > > come
> > > >> > > > > up
> > > >> > > > > > > and
> > > >> > > > > > > > > > recover the pending transaction within the
> > transaction
> > > >> > > timeout
> > > >> > > > > > > > interval.
> > > >> > > > > > > > > Or
> > > >> > > > > > > > > > are we saying that when used in this 2PC context,
> we
> > > >> should
> > > >> > > > > > configure
> > > >> > > > > > > > > these
> > > >> > > > > > > > > > transaction timeouts to very large durations?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thanks in advance!
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Best,
> > > >> > > > > > > > > > Arjun
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover <
> > > >> > > > > > roger.hoo...@gmail.com
> > > >> > > > > > > >
> > > >> > > > > > > > > > wrote:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Hi Artem,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Thanks for writing this KIP.  Can you clarify
> the
> > > >> > > > requirements
> > > >> > > > > a
> > > >> > > > > > > bit
> > > >> > > > > > > > > more
> > > >> > > > > > > > > > > for managing transaction state?  It looks like
> the
> > > >> > > > application
> > > >> > > > > > must
> > > >> > > > > > > > > have
> > > >> > > > > > > > > > > stable transactional ids over time?   What is
> the
> > > >> > > granularity
> > > >> > > > > of
> > > >> > > > > > > > those
> > > >> > > > > > > > > > ids
> > > >> > > > > > > > > > > and producers?  Say the application is a
> > > >> multi-threaded
> > > >> > > Java
> > > >> > > > > web
> > > >> > > > > > > > > server,
> > > >> > > > > > > > > > > can/should all the concurrent threads share a
> > > >> > transactional
> > > >> > > > id
> > > >> > > > > > and
> > > >> > > > > > > > > > > producer?  That doesn't seem right to me unless
> > the
> > > >> > > > application
> > > >> > > > > > is
> > > >> > > > > > > > > using
> > > >> > > > > > > > > > > global DB locks that serialize all requests.
> > > >> Instead, if
> > > >> > > the
> > > >> > > > > > > > > application
> > > >> > > > > > > > > > > uses row-level DB locks, there could be
> multiple,
> > > >> > > concurrent,
> > > >> > > > > > > > > independent
> > > >> > > > > > > > > > > txns happening in the same JVM so it seems like
> > the
> > > >> > > > granularity
> > > >> > > > > > > > > managing
> > > >> > > > > > > > > > > transactional ids and txn state needs to line up
> > > with
> > > >> > > > > granularity
> > > >> > > > > > > of
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > DB
> > > >> > > > > > > > > > > locking.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Does that make sense or am I misunderstanding?
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Roger
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> > > >> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > > Hello,
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > This is a discussion thread for
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > >> > > > > > > > > > > > .
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > The KIP proposes extending Kafka transaction
> > > support
> > > >> > > (that
> > > >> > > > > > > already
> > > >> > > > > > > > > uses
> > > >> > > > > > > > > > > 2PC
> > > >> > > > > > > > > > > > under the hood) to enable atomicity of dual
> > writes
> > > >> to
> > > >> > > Kafka
> > > >> > > > > and
> > > >> > > > > > > an
> > > >> > > > > > > > > > > external
> > > >> > > > > > > > > > > > database, and helps to fix a long standing
> Flink
> > > >> issue.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > An example of code that uses the dual write
> > recipe
> > > >> with
> > > >> > > > JDBC
> > > >> > > > > > and
> > > >> > > > > > > > > should
> > > >> > > > > > > > > > > > work for most SQL databases is here
> > > >> > > > > > > > > > > > https://github.com/apache/kafka/pull/14231.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > The FLIP for the sister fix in Flink is here
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > -Artem
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to