Hi Jun,

> 32. ... metric name ...

I've updated the metric name to be
*kafka.coordinator.transaction:type=TransactionStateManager,name=ActiveTransactionOpenTimeMax.*

Let me know if it works.

-Artem



On Thu, Feb 29, 2024 at 12:03 PM Artem Livshits <alivsh...@confluent.io>
wrote:

> Hi Jun,
>
> >  So, it doesn't provide the same guarantees as 2PC either.
>
> I think the key point is that we don't claim 2PC guarantees in that case.
> Maybe it's splitting hairs from the technical perspective (in the end of
> the day if the operator doesn't let the user use 2PC, it's going to be a
> "works until timeout" solution), but from user model perspective it
> provides a clear structure:
>
> - if 2PC is possible then all guarantees are in place and there is no gray
> area where we sort of provide guarantees but not fully
> - if 2PC is not possible, then it's a well-informed constrain / decision
> with well-known characteristics and the user can choose whether this is
> acceptable or not for them
>
> Maybe we can look at it from a slightly different perspective: we are not
> making a choice between allowing or not allowing using keepPrepareTxn=true
> when 2PC=false (even though that's exactly how it looks from the KIP).  In
> fact, we're making a choice is whether Flink will be able to use an
> official API when 2PC is not possible (and I think we've converged to agree
> that sometimes it won't be) or keep using a reflection hack.  In other
> words, we already have a hacky implementation for the case of
> keepPrepareTxn=true + 2PC=false, our choice is only whether we provide an
> official API for that or not.
>
> In general, if someone goes and implements a reflection-based solution
> that's an indication that there is a gap in public APIs.  And we can debate
> whether keepPreparedTxn=true + 2PC=false is the right API or not; and if we
> think it's not, then we should provide an alternative.  Right now the
> alternative is to just keep using the reflection and I think it's always
> worse than using a public API.
>
> -Artem
>
> On Wed, Feb 28, 2024 at 2:23 PM Jun Rao <j...@confluent.io.invalid> wrote:
>
>> Hi, Artem,
>>
>> Thanks for the reply.
>>
>> I understand your concern on having a timeout breaking the 2PC guarantees.
>> However, the fallback plan to disable 2PC with an independent
>> keepPreparedTxn is subject to the timeout too. So, it doesn't provide the
>> same guarantees as 2PC either.
>>
>> To me, if we provide a new functionality, we should make it easy such that
>> the application developer only needs to implement it in one way, which is
>> always correct. Then, we can consider what additional things are needed to
>> make the operator comfortable enabling it.
>>
>> Jun
>>
>> On Tue, Feb 27, 2024 at 4:45 PM Artem Livshits
>> <alivsh...@confluent.io.invalid> wrote:
>>
>> > Hi Jun,
>> >
>> > Thank you for the discussion.
>> >
>> > > For 3b, it would be useful to understand the reason why an admin
>> doesn't
>> > authorize 2PC for self-hosted Flink
>> >
>> > I think the nuance here is that for cloud, there is a cloud admin
>> > (operator) and there is cluster admin (who, for example could manage
>> acls
>> > on topics or etc.).  The 2PC functionality can affect cloud operations,
>> > because a long running transaction can block the last stable offset and
>> > prevent compaction or data tiering.  In a multi-tenant environment, a
>> long
>> > running transaction that involves consumer offset may affect data that
>> is
>> > shared by multiple tenants (Flink transactions don't use consumer
>> offsets,
>> > so this is not an issue for Flink, but we'd need a separate ACL or some
>> > other way to express this permission if we wanted to go in that
>> direction).
>> >
>> > For that reason, I expect 2PC to be controlled by the cloud operator
>> and it
>> > just may not be scalable for the cloud operator to manage all potential
>> > interactions required to resolve in-doubt transactions (communicate to
>> the
>> > end users, etc.).  In general, we make no assumptions about Kafka
>> > applications -- they may come and go, they may abandon transactional ids
>> > and generate new ones.  For 2PC we need to make sure that the
>> application
>> > is highly available and wouldn't easily abandon an open transaction in
>> > Kafka.
>> >
>> > > If so, another way to address that is to allow the admin to set a
>> timeout
>> > even for the 2PC case.
>> >
>> > This effectively abandons the 2PC guarantee because it creates a case
>> for
>> > Kafka to unilaterally make an automatic decision on a prepared
>> > transaction.  I think it's fundamental for 2PC to abandon this ability
>> and
>> > wait for the external coordinator for the decision, after all the
>> > coordinator may legitimately be unavailable for an arbitrary amount of
>> > time.  Also, we already have a timeout on regular Kafka transactions,
>> > having another "special" timeout could be confusing, and a large enough
>> > timeout could still produce the undesirable effects for the cloud
>> > operations (so we kind of get worst of both options -- we don't provide
>> > guarantees and still have impact on operations).
>> >
>> > -Artem
>> >
>> > On Fri, Feb 23, 2024 at 8:55 AM Jun Rao <j...@confluent.io.invalid>
>> wrote:
>> >
>> > > Hi, Artem,
>> > >
>> > > Thanks for the reply.
>> > >
>> > > For 3b, it would be useful to understand the reason why an admin
>> doesn't
>> > > authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
>> > > unbounded timeout that could lead to unbounded outstanding
>> transactions?
>> > If
>> > > so, another way to address that is to allow the admin to set a timeout
>> > even
>> > > for the 2PC case. The timeout would be long enough for behavioring
>> > > applications to complete 2PC operations, but not too long for
>> > non-behaving
>> > > applications' transactions to hang.
>> > >
>> > > Jun
>> > >
>> > > On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits
>> > > <alivsh...@confluent.io.invalid> wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > > 20A. One option is to make the API initTransactions(boolean
>> > enable2PC).
>> > > >
>> > > > We could do that.  I think there is a little bit of symmetry between
>> > the
>> > > > client and server that would get lost with this approach (server has
>> > > > enable2PC as config), but I don't really see a strong reason for
>> > > enable2PC
>> > > > to be a config vs. an argument for initTransactions.  But let's see
>> if
>> > we
>> > > > find 20B to be a strong consideration for keeping a separate flag
>> for
>> > > > keepPreparedTxn.
>> > > >
>> > > > > 20B. But realistically, we want Flink (and other apps) to have a
>> > single
>> > > > implementation
>> > > >
>> > > > That's correct and here's what I think can happen if we don't allow
>> > > > independent keepPreparedTxn:
>> > > >
>> > > > 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster --
>> reflection is
>> > > > used, which effectively implements keepPreparedTxn=true without our
>> > > > explicit support.
>> > > > 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
>> > > > either fall back to reflection or we just say we don't support this,
>> > have
>> > > > to upgrade Kafka cluster first.
>> > > > 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
>> > > > interesting depending on whether the Kafka cluster authorizes 2PC or
>> > not:
>> > > >  3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything
>> > uses
>> > > > KIP-939 and there is no problem
>> > > >  3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we
>> > can
>> > > > either fallback to reflection or use keepPreparedTxn=true even if
>> 2PC
>> > is
>> > > > not enabled.
>> > > >
>> > > > It seems to be ok to not support case 2 (i.e. require Kafka upgrade
>> > > first),
>> > > > it shouldn't be an issue for cloud offerings as cloud providers are
>> > > likely
>> > > > to upgrade their Kafka to the latest versions.
>> > > >
>> > > > The case 3b seems to be important to support, though -- the latest
>> > > version
>> > > > of everything should work at least as well (and preferably better)
>> than
>> > > > previous ones.  It's possible to downgrade to case 1, but it's
>> probably
>> > > not
>> > > > sustainable as newer versions of Flink would also add other features
>> > that
>> > > > the customers may want to take advantage of.
>> > > >
>> > > > If we enabled keepPreparedTxn=true even without 2PC, then we could
>> > enable
>> > > > case 3b without the need to fall back to reflection, so we could get
>> > rid
>> > > of
>> > > > reflection-based logic and just have a single implementation based
>> on
>> > > > KIP-939.
>> > > >
>> > > > > 32. My suggestion is to change
>> > > >
>> > > > Let me think about it and I'll come back to this.
>> > > >
>> > > > -Artem
>> > > >
>> > > > On Wed, Feb 21, 2024 at 3:40 PM Jun Rao <j...@confluent.io.invalid>
>> > > wrote:
>> > > >
>> > > > > Hi, Artem,
>> > > > >
>> > > > > Thanks for the reply.
>> > > > >
>> > > > > 20A. One option is to make the API initTransactions(boolean
>> > enable2PC).
>> > > > > Then, it's clear from the code whether 2PC related logic should be
>> > > added.
>> > > > >
>> > > > > 20B. But realistically, we want Flink (and other apps) to have a
>> > single
>> > > > > implementation of the 2PC logic, not two different
>> implementations,
>> > > > right?
>> > > > >
>> > > > > 32. My suggestion is to
>> > > > > change
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>> > > > > to sth like
>> > > > > Metric Name                        Type  Group
>> > > > > Tags   Description
>> > > > > active-transaction-open-time-max   Max
>> > >  transaction-coordinator-metrics
>> > > > >  none  The max time a currently-open transaction has been open
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits
>> > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > >
>> > > > > > Hi Jun,
>> > > > > >
>> > > > > > > 20A.  This only takes care of the abort case. The application
>> > still
>> > > > > needs
>> > > > > > to be changed to handle the commit case properly
>> > > > > >
>> > > > > > My point here is that looking at the initTransactions() call
>> it's
>> > not
>> > > > > clear
>> > > > > > what the semantics is.  Say I'm doing code review, I cannot say
>> if
>> > > the
>> > > > > code
>> > > > > > is correct or not -- if the config (that's something that's
>> > > > > > theoretically not known at the time of code review) is going to
>> > > enable
>> > > > > 2PC,
>> > > > > > then the correct code should look one way, otherwise it would
>> need
>> > to
>> > > > > look
>> > > > > > differently.  Also, say if code is written with
>> InitTransaction()
>> > > > without
>> > > > > > explicit abort and then for whatever reason the code would get
>> used
>> > > > with
>> > > > > > 2PC enabled (could be a library in a bigger product) it'll start
>> > > > breaking
>> > > > > > in a non-intuitive way.
>> > > > > >
>> > > > > > > 20B. Hmm, if the admin disables 2PC, there is likely a reason
>> > > behind
>> > > > > that
>> > > > > >
>> > > > > > That's true, but reality may be more complicated.  Say a user
>> wants
>> > > to
>> > > > > run
>> > > > > > a self-managed Flink with Confluent cloud.  Confluent cloud adim
>> > may
>> > > > not
>> > > > > > be comfortable enabling 2PC to general user accounts that use
>> > > services
>> > > > > not
>> > > > > > managed by Confluent (the same way Confluent doesn't allow
>> > increasing
>> > > > max
>> > > > > > transaction timeout for general user accounts).  Right now,
>> > > > self-managed
>> > > > > > Flink works because it uses reflection, if it moves to use
>> public
>> > > APIs
>> > > > > > provided by KIP-939 it'll break.
>> > > > > >
>> > > > > > > 32. Ok. That's the kafka metric. In that case, the metric name
>> > has
>> > > a
>> > > > > > group and a name. There is no type and no package name.
>> > > > > >
>> > > > > > Is this a suggestion to change or confirmation that the current
>> > logic
>> > > > is
>> > > > > > ok?  I just copied an existing metric but can change if needed.
>> > > > > >
>> > > > > > -Artem
>> > > > > >
>> > > > > > On Tue, Feb 20, 2024 at 11:25 AM Jun Rao
>> <j...@confluent.io.invalid
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi, Artem,
>> > > > > > >
>> > > > > > > Thanks for the reply.
>> > > > > > >
>> > > > > > > 20. "Say if an application
>> > > > > > > currently uses initTransactions() to achieve the current
>> > semantics,
>> > > > it
>> > > > > > > would need to be rewritten to use initTransactions() + abort
>> to
>> > > > achieve
>> > > > > > the
>> > > > > > > same semantics if the config is changed. "
>> > > > > > >
>> > > > > > > This only takes care of the abort case. The application still
>> > needs
>> > > > to
>> > > > > be
>> > > > > > > changed to handle the commit case properly
>> > > > > > > if transaction.two.phase.commit.enable is set to true.
>> > > > > > >
>> > > > > > > "Even when KIP-939 is implemented,
>> > > > > > > there would be situations when 2PC is disabled by the admin
>> (e.g.
>> > > > Kafka
>> > > > > > > service providers may be reluctant to enable 2PC for Flink
>> > services
>> > > > > that
>> > > > > > > users host themselves), so we either have to perpetuate the
>> > > > > > > reflection-based implementation in Flink or enable
>> > > > keepPreparedTxn=true
>> > > > > > > without 2PC."
>> > > > > > >
>> > > > > > > Hmm, if the admin disables 2PC, there is likely a reason
>> behind
>> > > > that. I
>> > > > > > am
>> > > > > > > not sure that we should provide an API to encourage the
>> > application
>> > > > to
>> > > > > > > circumvent that.
>> > > > > > >
>> > > > > > > 32. Ok. That's the kafka metric. In that case, the metric name
>> > has
>> > > a
>> > > > > > group
>> > > > > > > and a name. There is no type and no package name.
>> > > > > > >
>> > > > > > > Jun
>> > > > > > >
>> > > > > > >
>> > > > > > > On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
>> > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > >
>> > > > > > > > Hi Jun,
>> > > > > > > >
>> > > > > > > > Thank you for your questions.
>> > > > > > > >
>> > > > > > > > > 20. So to abort a prepared transaction after the producer
>> > > start,
>> > > > we
>> > > > > > > could
>> > > > > > > > use ...
>> > > > > > > >
>> > > > > > > > I agree, initTransaction(true) + abort would accomplish the
>> > > > behavior
>> > > > > of
>> > > > > > > > initTransactions(false), so we could technically have fewer
>> > ways
>> > > to
>> > > > > > > achieve
>> > > > > > > > the same thing, which is generally valuable.  I wonder,
>> though,
>> > > if
>> > > > > that
>> > > > > > > > would be intuitive from the application perspective.  Say
>> if an
>> > > > > > > application
>> > > > > > > > currently uses initTransactions() to achieve the current
>> > > semantics,
>> > > > > it
>> > > > > > > > would need to be rewritten to use initTransactions() +
>> abort to
>> > > > > achieve
>> > > > > > > the
>> > > > > > > > same semantics if the config is changed.  I think this could
>> > > create
>> > > > > > > > subtle confusion, as the config change is generally
>> decoupled
>> > > from
>> > > > > > > changing
>> > > > > > > > application implementation.
>> > > > > > > >
>> > > > > > > > >  The use case mentioned for keepPreparedTxn=true without
>> 2PC
>> > > > > doesn't
>> > > > > > > seem
>> > > > > > > > very important
>> > > > > > > >
>> > > > > > > > I agree, it's not a strict requirement.  It is, however, a
>> > > missing
>> > > > > > option
>> > > > > > > > in the public API, so currently Flink has to use reflection
>> to
>> > > > > emulate
>> > > > > > > this
>> > > > > > > > functionality without 2PC support.   Even when KIP-939 is
>> > > > > implemented,
>> > > > > > > > there would be situations when 2PC is disabled by the admin
>> > (e.g.
>> > > > > Kafka
>> > > > > > > > service providers may be reluctant to enable 2PC for Flink
>> > > services
>> > > > > > that
>> > > > > > > > users host themselves), so we either have to perpetuate the
>> > > > > > > > reflection-based implementation in Flink or enable
>> > > > > keepPreparedTxn=true
>> > > > > > > > without 2PC.
>> > > > > > > >
>> > > > > > > > > 32.
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>> > > > > > > >
>> > > > > > > > I just followed the existing metric implementation example
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
>> > > > > > > > ,
>> > > > > > > > which maps to
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
>> > > > > > > >
>> > > > > > > > > 33. "If the value is 'true' then the corresponding field
>> is
>> > set
>> > > > > > > >
>> > > > > > > > That's correct.  Updated the KIP.
>> > > > > > > >
>> > > > > > > > -Artem
>> > > > > > > >
>> > > > > > > > On Wed, Feb 7, 2024 at 10:06 AM Jun Rao
>> > <j...@confluent.io.invalid
>> > > >
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi, Artem,
>> > > > > > > > >
>> > > > > > > > > Thanks for the reply.
>> > > > > > > > >
>> > > > > > > > > 20. So to abort a prepared transaction after producer
>> start,
>> > we
>> > > > > could
>> > > > > > > use
>> > > > > > > > > either
>> > > > > > > > >   producer.initTransactions(false)
>> > > > > > > > > or
>> > > > > > > > >   producer.initTransactions(true)
>> > > > > > > > >   producer.abortTransaction
>> > > > > > > > > Could we just always use the latter API? If we do this, we
>> > > could
>> > > > > > > > > potentially eliminate the keepPreparedTxn flag in
>> > > > > initTransactions().
>> > > > > > > > After
>> > > > > > > > > the initTransactions() call, the outstanding txn is always
>> > > > > preserved
>> > > > > > if
>> > > > > > > > 2pc
>> > > > > > > > > is enabled and aborted if 2pc is disabled. The use case
>> > > mentioned
>> > > > > for
>> > > > > > > > > keepPreparedTxn=true without 2PC doesn't seem very
>> important.
>> > > If
>> > > > we
>> > > > > > > could
>> > > > > > > > > do that, it seems that we have (1) less redundant and
>> simpler
>> > > > APIs;
>> > > > > > (2)
>> > > > > > > > > more symmetric syntax for aborting/committing a prepared
>> txn
>> > > > after
>> > > > > > > > producer
>> > > > > > > > > restart.
>> > > > > > > > >
>> > > > > > > > > 32.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>> > > > > > > > > Is this a Yammer or kafka metric? The former uses the
>> camel
>> > > case
>> > > > > for
>> > > > > > > name
>> > > > > > > > > and type. The latter uses the hyphen notation, but doesn't
>> > have
>> > > > the
>> > > > > > > type
>> > > > > > > > > attribute.
>> > > > > > > > >
>> > > > > > > > > 33. "If the value is 'true' then the corresponding field
>> is
>> > set
>> > > > in
>> > > > > > the
>> > > > > > > > > InitProducerIdRequest and the KafkaProducer object is set
>> > into
>> > > a
>> > > > > > state
>> > > > > > > > > which only allows calling .commitTransaction or
>> > > > .abortTransaction."
>> > > > > > > > > We should also allow .completeTransaction, right?
>> > > > > > > > >
>> > > > > > > > > Jun
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
>> > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Jun,
>> > > > > > > > > >
>> > > > > > > > > > > 20. For Flink usage, it seems that the APIs used to
>> abort
>> > > and
>> > > > > > > commit
>> > > > > > > > a
>> > > > > > > > > > prepared txn are not symmetric.
>> > > > > > > > > >
>> > > > > > > > > > For Flink it is expected that Flink would call
>> > > > .commitTransaction
>> > > > > > or
>> > > > > > > > > > .abortTransaction directly, it wouldn't need to deal
>> with
>> > > > > > > > > PreparedTxnState,
>> > > > > > > > > > the outcome is actually determined by the Flink's job
>> > > manager,
>> > > > > not
>> > > > > > by
>> > > > > > > > > > comparison of PreparedTxnState.  So for Flink, if the
>> Kafka
>> > > > sync
>> > > > > > > > crashes
>> > > > > > > > > > and restarts there are 2 cases:
>> > > > > > > > > >
>> > > > > > > > > > 1. Transaction is not prepared.  In that case just call
>> > > > > > > > > > producer.initTransactions(false) and then can start
>> > > > transactions
>> > > > > as
>> > > > > > > > > needed.
>> > > > > > > > > > 2. Transaction is prepared.  In that case call
>> > > > > > > > > > producer.initTransactions(true) and wait for the
>> decision
>> > > from
>> > > > > the
>> > > > > > > job
>> > > > > > > > > > manager.  Note that it's not given that the transaction
>> > will
>> > > > get
>> > > > > > > > > committed,
>> > > > > > > > > > the decision could also be an abort.
>> > > > > > > > > >
>> > > > > > > > > >  > 21. transaction.max.timeout.ms could in theory be
>> > > MAX_INT.
>> > > > > > > Perhaps
>> > > > > > > > we
>> > > > > > > > > > could use a negative timeout in the record to indicate
>> 2PC?
>> > > > > > > > > >
>> > > > > > > > > > -1 sounds good, updated.
>> > > > > > > > > >
>> > > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing
>> > txn.
>> > > > Do
>> > > > > we
>> > > > > > > > need
>> > > > > > > > > > both?
>> > > > > > > > > >
>> > > > > > > > > > I think of producer.initTransactions() to be an
>> > > implementation
>> > > > > for
>> > > > > > > > > > adminClient.forceTerminateTransaction(transactionalId).
>> > > > > > > > > >
>> > > > > > > > > > > 31. "This would flush all the pending messages and
>> > > transition
>> > > > > the
>> > > > > > > > > > producer
>> > > > > > > > > >
>> > > > > > > > > > Updated the KIP to clarify that IllegalStateException
>> will
>> > be
>> > > > > > thrown.
>> > > > > > > > > >
>> > > > > > > > > > -Artem
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao
>> > > > <j...@confluent.io.invalid
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi, Artem,
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks for the reply.
>> > > > > > > > > > >
>> > > > > > > > > > > 20. For Flink usage, it seems that the APIs used to
>> abort
>> > > and
>> > > > > > > commit
>> > > > > > > > a
>> > > > > > > > > > > prepared txn are not symmetric.
>> > > > > > > > > > > To abort, the app will just call
>> > > > > > > > > > >   producer.initTransactions(false)
>> > > > > > > > > > >
>> > > > > > > > > > > To commit, the app needs to call
>> > > > > > > > > > >   producer.initTransactions(true)
>> > > > > > > > > > >   producer.completeTransaction(preparedTxnState)
>> > > > > > > > > > >
>> > > > > > > > > > > Will this be a concern? For the dual-writer usage,
>> both
>> > > > > > > abort/commit
>> > > > > > > > > use
>> > > > > > > > > > > the same API.
>> > > > > > > > > > >
>> > > > > > > > > > > 21. transaction.max.timeout.ms could in theory be
>> > MAX_INT.
>> > > > > > Perhaps
>> > > > > > > > we
>> > > > > > > > > > > could
>> > > > > > > > > > > use a negative timeout in the record to indicate 2PC?
>> > > > > > > > > > >
>> > > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing
>> > txn.
>> > > > Do
>> > > > > we
>> > > > > > > > need
>> > > > > > > > > > > both?
>> > > > > > > > > > >   producer.initTransactions(false)
>> > > > > > > > > > >
>>  adminClient.forceTerminateTransaction(transactionalId)
>> > > > > > > > > > >
>> > > > > > > > > > > 31. "This would flush all the pending messages and
>> > > transition
>> > > > > the
>> > > > > > > > > > producer
>> > > > > > > > > > > into a mode where only .commitTransaction,
>> > > .abortTransaction,
>> > > > > or
>> > > > > > > > > > > .completeTransaction could be called.  If the call is
>> > > > > successful
>> > > > > > > (all
>> > > > > > > > > > > messages successfully got flushed to all partitions)
>> the
>> > > > > > > transaction
>> > > > > > > > is
>> > > > > > > > > > > prepared."
>> > > > > > > > > > >  If the producer calls send() in that state, what
>> > exception
>> > > > > will
>> > > > > > > the
>> > > > > > > > > > caller
>> > > > > > > > > > > receive?
>> > > > > > > > > > >
>> > > > > > > > > > > Jun
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
>> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi Jun,
>> > > > > > > > > > > >
>> > > > > > > > > > > > >  Then, should we change the following in the
>> example
>> > to
>> > > > use
>> > > > > > > > > > > > InitProducerId(true) instead?
>> > > > > > > > > > > >
>> > > > > > > > > > > > We could. I just thought that it's good to make the
>> > > example
>> > > > > > > > > > > self-contained
>> > > > > > > > > > > > by starting from a clean state.
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Also, could Flink just follow the dual-write
>> recipe?
>> > > > > > > > > > > >
>> > > > > > > > > > > > I think it would bring some unnecessary logic to
>> Flink
>> > > (or
>> > > > > any
>> > > > > > > > other
>> > > > > > > > > > > system
>> > > > > > > > > > > > that already has a transaction coordinator and just
>> > wants
>> > > > to
>> > > > > > > drive
>> > > > > > > > > > Kafka
>> > > > > > > > > > > to
>> > > > > > > > > > > > the desired state).  We could discuss it with Flink
>> > > folks,
>> > > > > the
>> > > > > > > > > current
>> > > > > > > > > > > > proposal was developed in collaboration with them.
>> > > > > > > > > > > >
>> > > > > > > > > > > > > 21. Could a non 2pc user explicitly set the
>> > > > > > > TransactionTimeoutMs
>> > > > > > > > to
>> > > > > > > > > > > > Integer.MAX_VALUE?
>> > > > > > > > > > > >
>> > > > > > > > > > > > The server would reject this for regular
>> transactions,
>> > it
>> > > > > only
>> > > > > > > > > accepts
>> > > > > > > > > > > > values that are <= *transaction.max.timeout.ms
>> > > > > > > > > > > > <http://transaction.max.timeout.ms> *(a broker
>> > config).
>> > > > > > > > > > > >
>> > > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator
>> > > expects
>> > > > > the
>> > > > > > > > > endTxn
>> > > > > > > > > > > > request to use the ongoing pid. ...
>> > > > > > > > > > > >
>> > > > > > > > > > > > Without 2PC there is no case where the pid could
>> change
>> > > > > between
>> > > > > > > > > > starting
>> > > > > > > > > > > a
>> > > > > > > > > > > > transaction and endTxn (InitProducerId would abort
>> any
>> > > > > ongoing
>> > > > > > > > > > > > transaction).  WIth 2PC there is now a case where
>> there
>> > > > could
>> > > > > > be
>> > > > > > > > > > > > InitProducerId that can change the pid without
>> aborting
>> > > the
>> > > > > > > > > > transaction,
>> > > > > > > > > > > so
>> > > > > > > > > > > > we need to handle that.  I wouldn't say that the
>> flow
>> > is
>> > > > > > > different,
>> > > > > > > > > but
>> > > > > > > > > > > > it's rather extended to handle new cases.  The main
>> > > > principle
>> > > > > > is
>> > > > > > > > > still
>> > > > > > > > > > > the
>> > > > > > > > > > > > same -- for all operations we use the latest
>> > > "operational"
>> > > > > pid
>> > > > > > > and
>> > > > > > > > > > epoch
>> > > > > > > > > > > > known to the client, this way we guarantee that we
>> can
>> > > > fence
>> > > > > > > > zombie /
>> > > > > > > > > > > split
>> > > > > > > > > > > > brain clients by disrupting the "latest known" pid +
>> > > epoch
>> > > > > > > > > progression.
>> > > > > > > > > > > >
>> > > > > > > > > > > > > 25. "We send out markers using the original
>> ongoing
>> > > > > > transaction
>> > > > > > > > > > > > ProducerId and ProducerEpoch" ...
>> > > > > > > > > > > >
>> > > > > > > > > > > > Updated.
>> > > > > > > > > > > >
>> > > > > > > > > > > > -Artem
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao
>> > > > > > <j...@confluent.io.invalid
>> > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi, Artem,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks for the reply.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 20. So for the dual-write recipe, we should always
>> > call
>> > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true) from the
>> > producer?
>> > > > > Then,
>> > > > > > > > > should
>> > > > > > > > > > we
>> > > > > > > > > > > > > change the following in the example to use
>> > > > > > InitProducerId(true)
>> > > > > > > > > > > instead?
>> > > > > > > > > > > > > 1. InitProducerId(false); TC STATE: Empty,
>> > > ProducerId=42,
>> > > > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1,
>> > > > NextProducerId=-1,
>> > > > > > > > > > > > > NextProducerEpoch=-1; RESPONSE ProducerId=42,
>> > > > Epoch=MAX-1,
>> > > > > > > > > > > > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
>> > > > > > > > > > > > > Also, could Flink just follow the dual-write
>> recipe?
>> > > It's
>> > > > > > > simpler
>> > > > > > > > > if
>> > > > > > > > > > > > there
>> > > > > > > > > > > > > is one way to solve the 2pc issue.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 21. Could a non 2pc user explicitly set the
>> > > > > > > TransactionTimeoutMs
>> > > > > > > > to
>> > > > > > > > > > > > > Integer.MAX_VALUE?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator
>> > > expects
>> > > > > the
>> > > > > > > > > endTxn
>> > > > > > > > > > > > > request to use the ongoing pid. With 2pc, the
>> > > coordinator
>> > > > > now
>> > > > > > > > > expects
>> > > > > > > > > > > the
>> > > > > > > > > > > > > endTxn request to use the next pid. So, the flow
>> is
>> > > > > > different,
>> > > > > > > > > right?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 25. "We send out markers using the original
>> ongoing
>> > > > > > transaction
>> > > > > > > > > > > > ProducerId
>> > > > > > > > > > > > > and ProducerEpoch"
>> > > > > > > > > > > > > We should use ProducerEpoch + 1 in the marker,
>> right?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Jun
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
>> > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi Jun,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 20.  I am a bit confused by how we set
>> > > > keepPreparedTxn.
>> > > > > > > ...
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > keepPreparedTxn=true informs the transaction
>> > > > coordinator
>> > > > > > that
>> > > > > > > > it
>> > > > > > > > > > > should
>> > > > > > > > > > > > > > keep the ongoing transaction, if any.  If the
>> > > > > > > > > > keepPreparedTxn=false,
>> > > > > > > > > > > > then
>> > > > > > > > > > > > > > any ongoing transaction is aborted (this is
>> exactly
>> > > the
>> > > > > > > current
>> > > > > > > > > > > > > behavior).
>> > > > > > > > > > > > > > enable2Pc is a separate argument that is
>> controlled
>> > > by
>> > > > > the
>> > > > > > > > > > > > > > *transaction.two.phase.commit.enable *setting on
>> > the
>> > > > > > client.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > To start 2PC, the client just needs to set
>> > > > > > > > > > > > > > *transaction.two.phase.commit.enable*=true in
>> the
>> > > > config.
>> > > > > > > Then
>> > > > > > > > > if
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > client knows the status of the transaction
>> upfront
>> > > (in
>> > > > > the
>> > > > > > > case
>> > > > > > > > > of
>> > > > > > > > > > > > Flink,
>> > > > > > > > > > > > > > Flink keeps the knowledge if the transaction is
>> > > > prepared
>> > > > > in
>> > > > > > > its
>> > > > > > > > > own
>> > > > > > > > > > > > > store,
>> > > > > > > > > > > > > > so it always knows upfront), it can set
>> > > keepPreparedTxn
>> > > > > > > > > > accordingly,
>> > > > > > > > > > > > then
>> > > > > > > > > > > > > > if the transaction was prepared, it'll be ready
>> for
>> > > the
>> > > > > > > client
>> > > > > > > > to
>> > > > > > > > > > > > > complete
>> > > > > > > > > > > > > > the appropriate action; if the client doesn't
>> have
>> > a
>> > > > > > > knowledge
>> > > > > > > > > that
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > transaction is prepared, keepPreparedTxn is
>> going
>> > to
>> > > be
>> > > > > > > false,
>> > > > > > > > in
>> > > > > > > > > > > which
>> > > > > > > > > > > > > > case we'll get to a clean state (the same way
>> we do
>> > > > > today).
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > For the dual-write recipe, the client doesn't
>> know
>> > > > > upfront
>> > > > > > if
>> > > > > > > > the
>> > > > > > > > > > > > > > transaction is prepared, this information is
>> > > implicitly
>> > > > > > > encoded
>> > > > > > > > > > > > > > PreparedTxnState value that can be used to
>> resolve
>> > > the
>> > > > > > > > > transaction
>> > > > > > > > > > > > state.
>> > > > > > > > > > > > > > In that case, keepPreparedTxn should always be
>> > true,
>> > > > > > because
>> > > > > > > we
>> > > > > > > > > > don't
>> > > > > > > > > > > > > know
>> > > > > > > > > > > > > > upfront and we don't want to accidentally abort
>> a
>> > > > > committed
>> > > > > > > > > > > > transaction.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > The forceTerminateTransaction call can just use
>> > > > > > > > > > > keepPreparedTxn=false,
>> > > > > > > > > > > > it
>> > > > > > > > > > > > > > actually doesn't matter if it sets Enable2Pc
>> flag.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some
>> field to
>> > > > > > identify
>> > > > > > > > > > whether
>> > > > > > > > > > > > this
>> > > > > > > > > > > > > > is written for 2PC so that ongoing txn is never
>> > auto
>> > > > > > aborted?
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > The TransactionTimeoutMs would be set to
>> > > > > Integer.MAX_VALUE
>> > > > > > if
>> > > > > > > > 2PC
>> > > > > > > > > > was
>> > > > > > > > > > > > > > enabled.  I've added a note to the KIP about
>> this.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 22
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > You're right it's a typo.  I fixed it as well as
>> > > step 9
>> > > > > > > > (REQUEST:
>> > > > > > > > > > > > > > ProducerId=73, ProducerEpoch=MAX).
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven
>> by
>> > a
>> > > > > config
>> > > > > > > > while
>> > > > > > > > > > > > > > KeepPreparedTxn is from an API param ...
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > The intent to use 2PC doesn't change from
>> > transaction
>> > > > to
>> > > > > > > > > > transaction,
>> > > > > > > > > > > > but
>> > > > > > > > > > > > > > the intent to keep prepared txn may change from
>> > > > > transaction
>> > > > > > > to
>> > > > > > > > > > > > > > transaction.  In dual-write recipes the
>> distinction
>> > > is
>> > > > > not
>> > > > > > > > clear,
>> > > > > > > > > > but
>> > > > > > > > > > > > for
>> > > > > > > > > > > > > > use cases where keepPreparedTxn value is known
>> > > upfront
>> > > > > > (e.g.
>> > > > > > > > > Flink)
>> > > > > > > > > > > > it's
>> > > > > > > > > > > > > > more prominent.  E.g. a Flink's Kafka sink
>> operator
>> > > > could
>> > > > > > be
>> > > > > > > > > > deployed
>> > > > > > > > > > > > > with
>> > > > > > > > > > > > > > *transaction.two.phase.commit.enable*=true
>> > hardcoded
>> > > in
>> > > > > the
>> > > > > > > > > image,
>> > > > > > > > > > > but
>> > > > > > > > > > > > > > keepPreparedTxn cannot be hardcoded in the
>> image,
>> > > > because
>> > > > > > it
>> > > > > > > > > > depends
>> > > > > > > > > > > on
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > job manager's state.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 24
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > The flow is actually going to be the same way
>> as it
>> > > is
>> > > > > now
>> > > > > > --
>> > > > > > > > the
>> > > > > > > > > > > > "main"
>> > > > > > > > > > > > > > producer id + epoch needs to be used in all
>> > > operations
>> > > > to
>> > > > > > > > prevent
>> > > > > > > > > > > > fencing
>> > > > > > > > > > > > > > (it's sort of a common "header" in all RPC calls
>> > that
>> > > > > > follow
>> > > > > > > > the
>> > > > > > > > > > same
>> > > > > > > > > > > > > > rules).  The ongoing txn info is just additional
>> > info
>> > > > for
>> > > > > > > > making
>> > > > > > > > > a
>> > > > > > > > > > > > > commit /
>> > > > > > > > > > > > > > abort decision based on the PreparedTxnState
>> from
>> > the
>> > > > DB.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > --Artem
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao
>> > > > > > > > > <j...@confluent.io.invalid
>> > > > > > > > > > >
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Hi, Artem,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Thanks for the reply. A few more comments.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 20. I am a bit confused by how we set
>> > > > keepPreparedTxn.
>> > > > > > From
>> > > > > > > > the
>> > > > > > > > > > > KIP,
>> > > > > > > > > > > > I
>> > > > > > > > > > > > > > got
>> > > > > > > > > > > > > > > the following (1) to start 2pc, we call
>> > > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false); (2)
>> when
>> > the
>> > > > > > > producer
>> > > > > > > > > > fails
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > > > needs to do recovery, it calls
>> > > > > > > > > > > InitProducerId(keepPreparedTxn=true);
>> > > > > > > > > > > > > (3)
>> > > > > > > > > > > > > > > Admin.forceTerminateTransaction() calls
>> > > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false).
>> > > > > > > > > > > > > > > 20.1 In (1), when a producer calls
>> > > > > InitProducerId(false)
>> > > > > > > with
>> > > > > > > > > 2pc
>> > > > > > > > > > > > > > enabled,
>> > > > > > > > > > > > > > > and there is an ongoing txn, should the server
>> > > return
>> > > > > an
>> > > > > > > > error
>> > > > > > > > > to
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > InitProducerId request? If so, what would be
>> the
>> > > > error
>> > > > > > > code?
>> > > > > > > > > > > > > > > 20.2 How do we distinguish between (1) and
>> (3)?
>> > > It's
>> > > > > the
>> > > > > > > same
>> > > > > > > > > API
>> > > > > > > > > > > > call
>> > > > > > > > > > > > > > but
>> > > > > > > > > > > > > > > (1) doesn't abort ongoing txn and (2) does.
>> > > > > > > > > > > > > > > 20.3 The usage in (1) seems unintuitive. 2pc
>> > > implies
>> > > > > > > keeping
>> > > > > > > > > the
>> > > > > > > > > > > > > ongoing
>> > > > > > > > > > > > > > > txn. So, setting keepPreparedTxn to false to
>> > start
>> > > > 2pc
>> > > > > > > seems
>> > > > > > > > > > > counter
>> > > > > > > > > > > > > > > intuitive.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some
>> field to
>> > > > > > identify
>> > > > > > > > > > whether
>> > > > > > > > > > > > this
>> > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > written for 2PC so that ongoing txn is never
>> auto
>> > > > > > aborted?
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 22. "8. InitProducerId(true); TC STATE:
>> Ongoing,
>> > > > > > > > ProducerId=42,
>> > > > > > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1,
>> > > > > > NextProducerId=73,
>> > > > > > > > > > > > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73,
>> > > > > > Epoch=MAX-1,
>> > > > > > > > > > > > > > > OngoingTxnProducerId=42,
>> OngoingTxnEpoch=MAX-1"
>> > > > > > > > > > > > > > > It seems in the above example, Epoch in
>> RESPONSE
>> > > > should
>> > > > > > be
>> > > > > > > > MAX
>> > > > > > > > > to
>> > > > > > > > > > > > match
>> > > > > > > > > > > > > > > NextProducerEpoch?
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven
>> by
>> > a
>> > > > > config
>> > > > > > > > > > > > > > > while KeepPreparedTxn is from an API param.
>> > Should
>> > > we
>> > > > > > make
>> > > > > > > > them
>> > > > > > > > > > > more
>> > > > > > > > > > > > > > > consistent since they seem related?
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > 24. "9. Commit; REQUEST: ProducerId=73,
>> > > > > > > ProducerEpoch=MAX-1;
>> > > > > > > > TC
>> > > > > > > > > > > > STATE:
>> > > > > > > > > > > > > > > PrepareCommit, ProducerId=42,
>> ProducerEpoch=MAX,
>> > > > > > > > > > PrevProducerId=73,
>> > > > > > > > > > > > > > > NextProducerId=85, NextProducerEpoch=0;
>> RESPONSE
>> > > > > > > > ProducerId=85,
>> > > > > > > > > > > > > Epoch=0,
>> > > > > > > > > > > > > > > When a commit request is sent, it uses the
>> latest
>> > > > > > > ProducerId
>> > > > > > > > > and
>> > > > > > > > > > > > > > > ProducerEpoch."
>> > > > > > > > > > > > > > > The step where we use the next produceId to
>> > commit
>> > > an
>> > > > > old
>> > > > > > > txn
>> > > > > > > > > > > works,
>> > > > > > > > > > > > > but
>> > > > > > > > > > > > > > > can be confusing. It's going to be hard for
>> > people
>> > > > > > > > implementing
>> > > > > > > > > > > this
>> > > > > > > > > > > > > new
>> > > > > > > > > > > > > > > client protocol to figure out when to use the
>> > > current
>> > > > > or
>> > > > > > > the
>> > > > > > > > > new
>> > > > > > > > > > > > > > producerId
>> > > > > > > > > > > > > > > in the EndTxnRequest. One potential way to
>> > improve
>> > > > this
>> > > > > > is
>> > > > > > > to
>> > > > > > > > > > > extend
>> > > > > > > > > > > > > > > EndTxnRequest with a new field like
>> > > > > > expectedNextProducerId.
>> > > > > > > > > Then
>> > > > > > > > > > we
>> > > > > > > > > > > > can
>> > > > > > > > > > > > > > > always use the old produceId in the existing
>> > field,
>> > > > but
>> > > > > > set
>> > > > > > > > > > > > > > > expectedNextProducerId to bypass the fencing
>> > logic
>> > > > when
>> > > > > > > > needed.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Jun
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
>> > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Hi Jun,
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Thank you for the comments.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc
>> and
>> > > > > > > > KeepPreparedTxn
>> > > > > > > > > > ...
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > I added a note that all combinations are
>> valid.
>> > > > > > > > > > Enable2Pc=false
>> > > > > > > > > > > &
>> > > > > > > > > > > > > > > > KeepPreparedTxn=true could be potentially
>> > useful
>> > > > for
>> > > > > > > > backward
>> > > > > > > > > > > > > > > compatibility
>> > > > > > > > > > > > > > > > with Flink, when the new version of Flink
>> that
>> > > > > > implements
>> > > > > > > > > > KIP-319
>> > > > > > > > > > > > > tries
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > work with a cluster that doesn't authorize
>> 2PC.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 11.  InitProducerIdResponse: If there is
>> no
>> > > > ongoing
>> > > > > > > txn,
>> > > > > > > > > what
>> > > > > > > > > > > > will
>> > > > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be
>> > set?
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > I added a note that they will be set to -1.
>> > The
>> > > > > client
>> > > > > > > > then
>> > > > > > > > > > will
>> > > > > > > > > > > > > know
>> > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > there is no ongoing txn and
>> > .completeTransaction
>> > > > > > becomes
>> > > > > > > a
>> > > > > > > > > > no-op
>> > > > > > > > > > > > (but
>> > > > > > > > > > > > > > > still
>> > > > > > > > > > > > > > > > required before .send is enabled).
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 12. ListTransactionsRequest related
>> changes:
>> > It
>> > > > > seems
>> > > > > > > > those
>> > > > > > > > > > are
>> > > > > > > > > > > > > > already
>> > > > > > > > > > > > > > > > covered by KIP-994?
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Removed from this KIP.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 13. TransactionalLogValue ...
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > This is now updated to work on top of
>> KIP-890.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch)
>> pair
>> > > that
>> > > > > > > > > corresponds
>> > > > > > > > > > to
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > ongoing transaction ...
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > This is now updated to work on top of
>> KIP-890.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 15. active-transaction-total-time-max :
>> ...
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Updated.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable
>> The
>> > > > > default
>> > > > > > > > would
>> > > > > > > > > be
>> > > > > > > > > > > > > > ‘false’.
>> > > > > > > > > > > > > > > > If it’s ‘false’, 2PC functionality is
>> disabled
>> > > even
>> > > > > if
>> > > > > > > the
>> > > > > > > > > ACL
>> > > > > > > > > > is
>> > > > > > > > > > > > set
>> > > > > > > > > > > > > > ...
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Disabling 2PC effectively removes all
>> > > authorization
>> > > > > to
>> > > > > > > use
>> > > > > > > > > it,
>> > > > > > > > > > > > hence
>> > > > > > > > > > > > > I
>> > > > > > > > > > > > > > > > thought
>> TRANSACTIONAL_ID_AUTHORIZATION_FAILED
>> > > would
>> > > > > be
>> > > > > > > > > > > appropriate.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Do you suggest using a different error code
>> for
>> > > 2PC
>> > > > > > > > > > authorization
>> > > > > > > > > > > > vs
>> > > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > other authorization (e.g.
>> > > > > > > > > > > > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
>> > > > > > > > > > > > > > or a
>> > > > > > > > > > > > > > > > different code for disabled vs. unauthorised
>> > > (e.g.
>> > > > > > > > > > > > > > > > TWO_PHASE_COMMIT_DISABLED) or both?
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this
>> to
>> > be
>> > > > > only
>> > > > > > > used
>> > > > > > > > > > > during
>> > > > > > > > > > > > > > > > recovery.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > It can also be used if, say, a commit to the
>> > > > database
>> > > > > > > fails
>> > > > > > > > > and
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > result
>> > > > > > > > > > > > > > > > is inconclusive, e.g.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > 1. Begin DB transaction
>> > > > > > > > > > > > > > > > 2. Begin Kafka transaction
>> > > > > > > > > > > > > > > > 3. Prepare Kafka transaction
>> > > > > > > > > > > > > > > > 4. Commit DB transaction
>> > > > > > > > > > > > > > > > 5. The DB commit fails, figure out the
>> state of
>> > > the
>> > > > > > > > > transaction
>> > > > > > > > > > > by
>> > > > > > > > > > > > > > > reading
>> > > > > > > > > > > > > > > > the PreparedTxnState from DB
>> > > > > > > > > > > > > > > > 6. Complete Kafka transaction with the
>> > > > > > PreparedTxnState.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 18. "either prepareTransaction was called
>> or
>> > > > > > > > > > > > initTransaction(true)
>> > > > > > > > > > > > > > was
>> > > > > > > > > > > > > > > > called": "either" should be "neither"?
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Updated.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up
>> the
>> > > > epoch,
>> > > > > > it
>> > > > > > > > > > creates
>> > > > > > > > > > > a
>> > > > > > > > > > > > > > > > situation ...
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > InitProducerId only bumps the producer
>> epoch,
>> > the
>> > > > > > ongoing
>> > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > stays the same, no matter how many times the
>> > > > > > > InitProducerId
>> > > > > > > > > is
>> > > > > > > > > > > > called
>> > > > > > > > > > > > > > > > before the transaction is completed.
>> > Eventually
>> > > > the
>> > > > > > > epoch
>> > > > > > > > > may
>> > > > > > > > > > > > > > overflow,
>> > > > > > > > > > > > > > > > and then a new producer id would be
>> allocated,
>> > > but
>> > > > > the
>> > > > > > > > > ongoing
>> > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > producer id would stay the same.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > I've added a couple examples in the KIP (
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
>> > > > > > > > > > > > > > > > )
>> > > > > > > > > > > > > > > > that walk through some scenarios and show
>> how
>> > the
>> > > > > state
>> > > > > > > is
>> > > > > > > > > > > changed.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > -Artem
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao
>> > > > > > > > > > <j...@confluent.io.invalid
>> > > > > > > > > > > >
>> > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Hi, Artem,
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Thanks for the KIP. A few comments below.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc
>> and
>> > > > > > > > KeepPreparedTxn
>> > > > > > > > > > in
>> > > > > > > > > > > > > > > > > InitProducerId, it would be useful to
>> > document
>> > > a
>> > > > > bit
>> > > > > > > more
>> > > > > > > > > > > detail
>> > > > > > > > > > > > on
>> > > > > > > > > > > > > > > what
>> > > > > > > > > > > > > > > > > values are set under what cases. For
>> example,
>> > > are
>> > > > > all
>> > > > > > > > four
>> > > > > > > > > > > > > > combinations
>> > > > > > > > > > > > > > > > > valid?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 11.  InitProducerIdResponse: If there is
>> no
>> > > > ongoing
>> > > > > > > txn,
>> > > > > > > > > what
>> > > > > > > > > > > > will
>> > > > > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch
>> be
>> > > set?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 12. ListTransactionsRequest related
>> changes:
>> > It
>> > > > > seems
>> > > > > > > > those
>> > > > > > > > > > are
>> > > > > > > > > > > > > > already
>> > > > > > > > > > > > > > > > > covered by KIP-994?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 13. TransactionalLogValue: Could we name
>> > > > > > > > > > TransactionProducerId
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > ProducerId better? It's not clear from the
>> > name
>> > > > > which
>> > > > > > > is
>> > > > > > > > > for
>> > > > > > > > > > > > which.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch)
>> pair
>> > > that
>> > > > > > > > > corresponds
>> > > > > > > > > > to
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > ongoing
>> > > > > > > > > > > > > > > > > transaction is going to be written
>> instead of
>> > > the
>> > > > > > > > existing
>> > > > > > > > > > > > > ProducerId
>> > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > ProducerEpoch fields (which are renamed to
>> > > > reflect
>> > > > > > the
>> > > > > > > > > > > semantics)
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > support downgrade.": I am a bit confused
>> on
>> > > that.
>> > > > > Are
>> > > > > > > we
>> > > > > > > > > > > writing
>> > > > > > > > > > > > > > > > different
>> > > > > > > > > > > > > > > > > values to the existing fields? Then, we
>> can't
>> > > > > > > downgrade,
>> > > > > > > > > > right?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 15. active-transaction-total-time-max :
>> Would
>> > > > > > > > > > > > > > > > > active-transaction-open-time-max be more
>> > > > intuitive?
>> > > > > > > Also,
>> > > > > > > > > > could
>> > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > include
>> > > > > > > > > > > > > > > > > the full name (group, tags, etc)?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable
>> The
>> > > > > default
>> > > > > > > > would
>> > > > > > > > > be
>> > > > > > > > > > > > > > ‘false’.
>> > > > > > > > > > > > > > > > If
>> > > > > > > > > > > > > > > > > it’s ‘false’, 2PC functionality is
>> disabled
>> > > even
>> > > > if
>> > > > > > the
>> > > > > > > > ACL
>> > > > > > > > > > is
>> > > > > > > > > > > > set,
>> > > > > > > > > > > > > > > > clients
>> > > > > > > > > > > > > > > > > that attempt to use this functionality
>> would
>> > > > > receive
>> > > > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED
>> error."
>> > > > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED
>> seems
>> > > > > > unintuitive
>> > > > > > > > for
>> > > > > > > > > > the
>> > > > > > > > > > > > > > client
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > understand what the actual cause is.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this
>> to
>> > be
>> > > > > only
>> > > > > > > used
>> > > > > > > > > > > during
>> > > > > > > > > > > > > > > > recovery.
>> > > > > > > > > > > > > > > > > Could we document this clearly? Could we
>> > > prevent
>> > > > it
>> > > > > > > from
>> > > > > > > > > > being
>> > > > > > > > > > > > used
>> > > > > > > > > > > > > > > > > incorrectly (e.g. throw an exception if
>> the
>> > > > > producer
>> > > > > > > has
>> > > > > > > > > > called
>> > > > > > > > > > > > > other
>> > > > > > > > > > > > > > > > > methods like send())?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 18. "either prepareTransaction was called
>> or
>> > > > > > > > > > > > initTransaction(true)
>> > > > > > > > > > > > > > was
>> > > > > > > > > > > > > > > > > called": "either" should be "neither"?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up
>> the
>> > > > epoch,
>> > > > > > it
>> > > > > > > > > > creates
>> > > > > > > > > > > a
>> > > > > > > > > > > > > > > > situation
>> > > > > > > > > > > > > > > > > where there could be multiple outstanding
>> > txns.
>> > > > The
>> > > > > > > > > following
>> > > > > > > > > > > is
>> > > > > > > > > > > > an
>> > > > > > > > > > > > > > > > example
>> > > > > > > > > > > > > > > > > of a potential problem during recovery.
>> > > > > > > > > > > > > > > > >    The last txn epoch in the external
>> store
>> > is
>> > > 41
>> > > > > > when
>> > > > > > > > the
>> > > > > > > > > > app
>> > > > > > > > > > > > > dies.
>> > > > > > > > > > > > > > > > >    Instance1 is created for recovery.
>> > > > > > > > > > > > > > > > >      1. (instance1)
>> > > > > > > InitProducerId(keepPreparedTxn=true),
>> > > > > > > > > > > > epoch=42,
>> > > > > > > > > > > > > > > > > ongoingEpoch=41
>> > > > > > > > > > > > > > > > >      2. (instance1) dies before
>> > completeTxn(41)
>> > > > can
>> > > > > > be
>> > > > > > > > > > called.
>> > > > > > > > > > > > > > > > >    Instance2 is created for recovery.
>> > > > > > > > > > > > > > > > >      3. (instance2)
>> > > > > > > InitProducerId(keepPreparedTxn=true),
>> > > > > > > > > > > > epoch=43,
>> > > > > > > > > > > > > > > > > ongoingEpoch=42
>> > > > > > > > > > > > > > > > >      4. (instance2) completeTxn(41) =>
>> abort
>> > > > > > > > > > > > > > > > >    The first problem is that 41 now is
>> > aborted
>> > > > when
>> > > > > > it
>> > > > > > > > > should
>> > > > > > > > > > > be
>> > > > > > > > > > > > > > > > committed.
>> > > > > > > > > > > > > > > > > The second one is that it's not clear who
>> > could
>> > > > > abort
>> > > > > > > > epoch
>> > > > > > > > > > 42,
>> > > > > > > > > > > > > which
>> > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > still open.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Jun
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine
>> Olshan
>> > > > > > > > > > > > > > > > <jols...@confluent.io.invalid
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Hey Artem,
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Thanks for the updates. I think what you
>> > say
>> > > > > makes
>> > > > > > > > > sense. I
>> > > > > > > > > > > > just
>> > > > > > > > > > > > > > > > updated
>> > > > > > > > > > > > > > > > > my
>> > > > > > > > > > > > > > > > > > KIP so I want to reconcile some of the
>> > > changes
>> > > > we
>> > > > > > > made
>> > > > > > > > > > > > especially
>> > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > respect to the TransactionLogValue.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Firstly, I believe tagged fields
>> require a
>> > > > > default
>> > > > > > > > value
>> > > > > > > > > so
>> > > > > > > > > > > > that
>> > > > > > > > > > > > > if
>> > > > > > > > > > > > > > > > they
>> > > > > > > > > > > > > > > > > > are not filled, we return the default
>> (and
>> > > know
>> > > > > > that
>> > > > > > > > they
>> > > > > > > > > > > were
>> > > > > > > > > > > > > > > empty).
>> > > > > > > > > > > > > > > > > For
>> > > > > > > > > > > > > > > > > > my KIP, I proposed the default for
>> producer
>> > > ID
>> > > > > > tagged
>> > > > > > > > > > fields
>> > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > > -1.
>> > > > > > > > > > > > > > > > > > I was wondering if we could update the
>> KIP
>> > to
>> > > > > > include
>> > > > > > > > the
>> > > > > > > > > > > > default
>> > > > > > > > > > > > > > > > values
>> > > > > > > > > > > > > > > > > > for producer ID and epoch.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Next, I noticed we decided to rename the
>> > > > fields.
>> > > > > I
>> > > > > > > > guess
>> > > > > > > > > > that
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > field
>> > > > > > > > > > > > > > > > > > "NextProducerId" in my KIP correlates to
>> > > > > > "ProducerId"
>> > > > > > > > in
>> > > > > > > > > > this
>> > > > > > > > > > > > > KIP.
>> > > > > > > > > > > > > > Is
>> > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > correct? So we would have
>> > > > "TransactionProducerId"
>> > > > > > for
>> > > > > > > > the
>> > > > > > > > > > > > > > non-tagged
>> > > > > > > > > > > > > > > > > field
>> > > > > > > > > > > > > > > > > > and have "ProducerId" (NextProducerId)
>> and
>> > > > > > > > > "PrevProducerId"
>> > > > > > > > > > > as
>> > > > > > > > > > > > > > tagged
>> > > > > > > > > > > > > > > > > > fields the final version after KIP-890
>> and
>> > > > > KIP-936
>> > > > > > > are
>> > > > > > > > > > > > > implemented.
>> > > > > > > > > > > > > > > Is
>> > > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > correct? I think the tags will need
>> > updating,
>> > > > but
>> > > > > > > that
>> > > > > > > > is
>> > > > > > > > > > > > > trivial.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > The final question I had was with
>> respect
>> > to
>> > > > > > storing
>> > > > > > > > the
>> > > > > > > > > > new
>> > > > > > > > > > > > > epoch.
>> > > > > > > > > > > > > > > In
>> > > > > > > > > > > > > > > > > > KIP-890 part 2 (epoch bumps) I think we
>> > > > concluded
>> > > > > > > that
>> > > > > > > > we
>> > > > > > > > > > > don't
>> > > > > > > > > > > > > > need
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > store the epoch since we can interpret
>> the
>> > > > > previous
>> > > > > > > > epoch
>> > > > > > > > > > > based
>> > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > producer ID. But here we could call the
>> > > > > > > InitProducerId
>> > > > > > > > > > > multiple
>> > > > > > > > > > > > > > times
>> > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > we only want the producer with the
>> correct
>> > > > epoch
>> > > > > to
>> > > > > > > be
>> > > > > > > > > able
>> > > > > > > > > > > to
>> > > > > > > > > > > > > > commit
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > transaction. Is that the correct
>> reasoning
>> > > for
>> > > > > why
>> > > > > > we
>> > > > > > > > > need
>> > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > here
>> > > > > > > > > > > > > > > > but
>> > > > > > > > > > > > > > > > > > not the Prepare/Commit state.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > > > Justine
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem
>> > > Livshits
>> > > > > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Hi Justine,
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > After thinking a bit about supporting
>> > > atomic
>> > > > > dual
>> > > > > > > > > writes
>> > > > > > > > > > > for
>> > > > > > > > > > > > > > Kafka
>> > > > > > > > > > > > > > > +
>> > > > > > > > > > > > > > > > > > NoSQL
>> > > > > > > > > > > > > > > > > > > database, I came to a conclusion that
>> we
>> > do
>> > > > > need
>> > > > > > to
>> > > > > > > > > bump
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > even
>> > > > > > > > > > > > > > > > > > > with
>> > InitProducerId(keepPreparedTxn=true).
>> > > > As
>> > > > > I
>> > > > > > > > > > described
>> > > > > > > > > > > in
>> > > > > > > > > > > > > my
>> > > > > > > > > > > > > > > > > previous
>> > > > > > > > > > > > > > > > > > > email, we wouldn't need to bump the
>> epoch
>> > > to
>> > > > > > > protect
>> > > > > > > > > from
>> > > > > > > > > > > > > zombies
>> > > > > > > > > > > > > > > so
>> > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > reasoning is still true.  But we
>> cannot
>> > > > protect
>> > > > > > > from
>> > > > > > > > > > > > > split-brain
>> > > > > > > > > > > > > > > > > > scenarios
>> > > > > > > > > > > > > > > > > > > when two or more instances of a
>> producer
>> > > with
>> > > > > the
>> > > > > > > > same
>> > > > > > > > > > > > > > > transactional
>> > > > > > > > > > > > > > > > id
>> > > > > > > > > > > > > > > > > > try
>> > > > > > > > > > > > > > > > > > > to produce at the same time.  The
>> > > dual-write
>> > > > > > > example
>> > > > > > > > > for
>> > > > > > > > > > > SQL
>> > > > > > > > > > > > > > > > databases
>> > > > > > > > > > > > > > > > > (
>> > > > > > > > > > > > > > > > > > >
>> > > > > https://github.com/apache/kafka/pull/14231/files
>> > > > > > )
>> > > > > > > > > > doesn't
>> > > > > > > > > > > > > have a
>> > > > > > > > > > > > > > > > > > > split-brain problem because execution
>> is
>> > > > > > protected
>> > > > > > > by
>> > > > > > > > > the
>> > > > > > > > > > > > > update
>> > > > > > > > > > > > > > > lock
>> > > > > > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > > > > > the transaction state record; however
>> > NoSQL
>> > > > > > > databases
>> > > > > > > > > may
>> > > > > > > > > > > not
>> > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > protection (I'll write an example for
>> > NoSQL
>> > > > > > > database
>> > > > > > > > > > > > dual-write
>> > > > > > > > > > > > > > > > soon).
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > In a nutshell, here is an example of a
>> > > > > > split-brain
>> > > > > > > > > > > scenario:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >    1. (instance1)
>> > > > > > > > InitProducerId(keepPreparedTxn=true),
>> > > > > > > > > > got
>> > > > > > > > > > > > > > > epoch=42
>> > > > > > > > > > > > > > > > > > >    2. (instance2)
>> > > > > > > > InitProducerId(keepPreparedTxn=true),
>> > > > > > > > > > got
>> > > > > > > > > > > > > > > epoch=42
>> > > > > > > > > > > > > > > > > > >    3. (instance1) CommitTxn, epoch
>> bumped
>> > > to
>> > > > 43
>> > > > > > > > > > > > > > > > > > >    4. (instance2) CommitTxn, this is
>> > > > > considered a
>> > > > > > > > > retry,
>> > > > > > > > > > so
>> > > > > > > > > > > > it
>> > > > > > > > > > > > > > got
>> > > > > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > > > 43
>> > > > > > > > > > > > > > > > > > >    as well
>> > > > > > > > > > > > > > > > > > >    5. (instance1) Produce messageA
>> > > > w/sequence 1
>> > > > > > > > > > > > > > > > > > >    6. (instance2) Produce messageB
>> > > w/sequence
>> > > > > 1,
>> > > > > > > this
>> > > > > > > > > is
>> > > > > > > > > > > > > > > considered a
>> > > > > > > > > > > > > > > > > > >    duplicate
>> > > > > > > > > > > > > > > > > > >    7. (instance2) Produce messageC
>> > > > w/sequence 2
>> > > > > > > > > > > > > > > > > > >    8. (instance1) Produce messageD
>> > > w/sequence
>> > > > > 2,
>> > > > > > > this
>> > > > > > > > > is
>> > > > > > > > > > > > > > > considered a
>> > > > > > > > > > > > > > > > > > >    duplicate
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Now if either of those commit the
>> > > > transaction,
>> > > > > it
>> > > > > > > > would
>> > > > > > > > > > > have
>> > > > > > > > > > > > a
>> > > > > > > > > > > > > > mix
>> > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > > messages from the two instances
>> (messageA
>> > > and
>> > > > > > > > > messageC).
>> > > > > > > > > > > > With
>> > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > proper
>> > > > > > > > > > > > > > > > > > > epoch bump, instance1 would get
>> fenced at
>> > > > step
>> > > > > 3.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > In order to update epoch in
>> > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true)
>> > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > need
>> > > > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > preserve the ongoing transaction's
>> epoch
>> > > (and
>> > > > > > > > > producerId,
>> > > > > > > > > > > if
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > > > > overflows), because we'd need to make
>> a
>> > > > correct
>> > > > > > > > > decision
>> > > > > > > > > > > when
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > compare
>> > > > > > > > > > > > > > > > > > > the PreparedTxnState that we read from
>> > the
>> > > > > > database
>> > > > > > > > > with
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > (producerId,
>> > > > > > > > > > > > > > > > > > > epoch) of the ongoing transaction.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > I've updated the KIP with the
>> following:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >    - Ongoing transaction now has 2
>> > > > (producerId,
>> > > > > > > > epoch)
>> > > > > > > > > > > pairs
>> > > > > > > > > > > > --
>> > > > > > > > > > > > > > one
>> > > > > > > > > > > > > > > > > pair
>> > > > > > > > > > > > > > > > > > >    describes the ongoing transaction,
>> the
>> > > > other
>> > > > > > > pair
>> > > > > > > > > > > > describes
>> > > > > > > > > > > > > > > > expected
>> > > > > > > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > > > >    for operations on this
>> transactional
>> > id
>> > > > > > > > > > > > > > > > > > >    - InitProducerIdResponse now
>> returns 2
>> > > > > > > > (producerId,
>> > > > > > > > > > > epoch)
>> > > > > > > > > > > > > > pairs
>> > > > > > > > > > > > > > > > > > >    - TransactionalLogValue now has 2
>> > > > > (producerId,
>> > > > > > > > > epoch)
>> > > > > > > > > > > > pairs,
>> > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > new
>> > > > > > > > > > > > > > > > > > >    values added as tagged fields, so
>> it's
>> > > > easy
>> > > > > to
>> > > > > > > > > > downgrade
>> > > > > > > > > > > > > > > > > > >    - Added a note about downgrade in
>> the
>> > > > > > > > Compatibility
>> > > > > > > > > > > > section
>> > > > > > > > > > > > > > > > > > >    - Added a rejected alternative
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > -Artem
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem
>> > > > Livshits <
>> > > > > > > > > > > > > > > > alivsh...@confluent.io>
>> > > > > > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Hi Justine,
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Thank you for the questions.
>> Currently
>> > > > > > > > (pre-KIP-939)
>> > > > > > > > > > we
>> > > > > > > > > > > > > always
>> > > > > > > > > > > > > > > > bump
>> > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > epoch on InitProducerId and abort an
>> > > > ongoing
>> > > > > > > > > > transaction
>> > > > > > > > > > > > (if
>> > > > > > > > > > > > > > > > any).  I
>> > > > > > > > > > > > > > > > > > > > expect this behavior will continue
>> with
>> > > > > KIP-890
>> > > > > > > as
>> > > > > > > > > > well.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > With KIP-939 we need to support the
>> > case
>> > > > when
>> > > > > > the
>> > > > > > > > > > ongoing
>> > > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > > > > > needs to be preserved when
>> > > > > > keepPreparedTxn=true.
>> > > > > > > > > > Bumping
>> > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > > without
>> > > > > > > > > > > > > > > > > > > > aborting or committing a
>> transaction is
>> > > > > tricky
>> > > > > > > > > because
>> > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > is a
>> > > > > > > > > > > > > > > > > short
>> > > > > > > > > > > > > > > > > > > > value and it's easy to overflow.
>> > > > Currently,
>> > > > > > the
>> > > > > > > > > > overflow
>> > > > > > > > > > > > > case
>> > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > > handled
>> > > > > > > > > > > > > > > > > > > > by aborting the ongoing transaction,
>> > > which
>> > > > > > would
>> > > > > > > > send
>> > > > > > > > > > out
>> > > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > > > > > markers with epoch=Short.MAX_VALUE
>> to
>> > the
>> > > > > > > partition
>> > > > > > > > > > > > leaders,
>> > > > > > > > > > > > > > > which
>> > > > > > > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > > > > > fence off any messages with the
>> > producer
>> > > id
>> > > > > > that
>> > > > > > > > > > started
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > > > > > (they would have epoch that is less
>> > than
>> > > > > > > > > > > Short.MAX_VALUE).
>> > > > > > > > > > > > > > Then
>> > > > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > > > safe
>> > > > > > > > > > > > > > > > > > > > to allocate a new producer id and
>> use
>> > it
>> > > in
>> > > > > new
>> > > > > > > > > > > > transactions.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > We could say that maybe when
>> > > > > > keepPreparedTxn=true
>> > > > > > > > we
>> > > > > > > > > > bump
>> > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > > unless
>> > > > > > > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > > > > > leads to overflow, and don't bump
>> epoch
>> > > in
>> > > > > the
>> > > > > > > > > overflow
>> > > > > > > > > > > > case.
>> > > > > > > > > > > > > > I
>> > > > > > > > > > > > > > > > > don't
>> > > > > > > > > > > > > > > > > > > > think it's a good solution because
>> if
>> > > it's
>> > > > > not
>> > > > > > > safe
>> > > > > > > > > to
>> > > > > > > > > > > keep
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > same
>> > > > > > > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > > > > > when keepPreparedTxn=true, then we
>> must
>> > > > > handle
>> > > > > > > the
>> > > > > > > > > > epoch
>> > > > > > > > > > > > > > overflow
>> > > > > > > > > > > > > > > > > case
>> > > > > > > > > > > > > > > > > > as
>> > > > > > > > > > > > > > > > > > > > well.  So either we should convince
>> > > > ourselves
>> > > > > > > that
>> > > > > > > > > it's
>> > > > > > > > > > > > safe
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > keep
>> > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > epoch and do it in the general
>> case, or
>> > > we
>> > > > > > always
>> > > > > > > > > bump
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > handle
>> > > > > > > > > > > > > > > > > > > > the overflow.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > With KIP-890, we bump the epoch on
>> > every
>> > > > > > > > transaction
>> > > > > > > > > > > > commit /
>> > > > > > > > > > > > > > > > abort.
>> > > > > > > > > > > > > > > > > > > This
>> > > > > > > > > > > > > > > > > > > > guarantees that even if
>> > > > > > > > > > > > InitProducerId(keepPreparedTxn=true)
>> > > > > > > > > > > > > > > > doesn't
>> > > > > > > > > > > > > > > > > > > > increment epoch on the ongoing
>> > > transaction,
>> > > > > the
>> > > > > > > > > client
>> > > > > > > > > > > will
>> > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > call
>> > > > > > > > > > > > > > > > > > > > commit or abort to finish the
>> > transaction
>> > > > and
>> > > > > > > will
>> > > > > > > > > > > > increment
>> > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > > > > (and
>> > > > > > > > > > > > > > > > > > > > handle epoch overflow, if needed).
>> If
>> > > the
>> > > > > > > ongoing
>> > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > was
>> > > > > > > > > > > > > > > > > in a
>> > > > > > > > > > > > > > > > > > > bad
>> > > > > > > > > > > > > > > > > > > > state and had some zombies waiting
>> to
>> > > > arrive,
>> > > > > > the
>> > > > > > > > > abort
>> > > > > > > > > > > > > > operation
>> > > > > > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > > > > > fence them because with KIP-890
>> every
>> > > abort
>> > > > > > would
>> > > > > > > > > bump
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > epoch.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > We could also look at this from the
>> > > > following
>> > > > > > > > > > > perspective.
>> > > > > > > > > > > > > > With
>> > > > > > > > > > > > > > > > > > KIP-890,
>> > > > > > > > > > > > > > > > > > > > zombies won't be able to cross
>> > > transaction
>> > > > > > > > > boundaries;
>> > > > > > > > > > > each
>> > > > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > > > > > completion creates a boundary and
>> any
>> > > > > activity
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > > past
>> > > > > > > > > > > > > gets
>> > > > > > > > > > > > > > > > > > confined
>> > > > > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > > the boundary.  Then data in any
>> > partition
>> > > > > would
>> > > > > > > > look
>> > > > > > > > > > like
>> > > > > > > > > > > > > this:
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > 1. message1, epoch=42
>> > > > > > > > > > > > > > > > > > > > 2. message2, epoch=42
>> > > > > > > > > > > > > > > > > > > > 3. message3, epoch=42
>> > > > > > > > > > > > > > > > > > > > 4. marker (commit or abort),
>> epoch=43
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Now if we inject steps 3a and 3b
>> like
>> > > this:
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > 1. message1, epoch=42
>> > > > > > > > > > > > > > > > > > > > 2. message2, epoch=42
>> > > > > > > > > > > > > > > > > > > > 3. message3, epoch=42
>> > > > > > > > > > > > > > > > > > > > 3a. crash
>> > > > > > > > > > > > > > > > > > > > 3b.
>> > InitProducerId(keepPreparedTxn=true)
>> > > > > > > > > > > > > > > > > > > > 4. marker (commit or abort),
>> epoch=43
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > The invariant still holds even with
>> > steps
>> > > > 3a
>> > > > > > and
>> > > > > > > 3b
>> > > > > > > > > --
>> > > > > > > > > > > > > whatever
>> > > > > > > > > > > > > > > > > > activity
>> > > > > > > > > > > > > > > > > > > > was in the past will get confined in
>> > the
>> > > > past
>> > > > > > > with
>> > > > > > > > > > > > mandatory
>> > > > > > > > > > > > > > > abort
>> > > > > > > > > > > > > > > > /
>> > > > > > > > > > > > > > > > > > > commit
>> > > > > > > > > > > > > > > > > > > > that must follow
>> > > > > > > > > InitProducerId(keepPreparedTxn=true).
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > So KIP-890 provides the proper
>> > isolation
>> > > > > > between
>> > > > > > > > > > > > > transactions,
>> > > > > > > > > > > > > > so
>> > > > > > > > > > > > > > > > > > > > injecting crash +
>> > > > > > > > > InitProducerId(keepPreparedTxn=true)
>> > > > > > > > > > > into
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > transaction sequence is safe from
>> the
>> > > > zombie
>> > > > > > > > > protection
>> > > > > > > > > > > > > > > > perspective.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > That said, I'm still thinking about
>> it
>> > > and
>> > > > > > > looking
>> > > > > > > > > for
>> > > > > > > > > > > > cases
>> > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > might
>> > > > > > > > > > > > > > > > > > > > break because we don't bump epoch
>> when
>> > > > > > > > > > > > > > > > > > > >
>> InitProducerId(keepPreparedTxn=true),
>> > if
>> > > > such
>> > > > > > > cases
>> > > > > > > > > > > exist,
>> > > > > > > > > > > > > > we'll
>> > > > > > > > > > > > > > > > need
>> > > > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > > develop the logic to handle epoch
>> > > overflow
>> > > > > for
>> > > > > > > > > ongoing
>> > > > > > > > > > > > > > > > transactions.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > -Artem
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM
>> Justine
>> > > > > Olshan
>> > > > > > > > > > > > > > > > > > > > <jols...@confluent.io.invalid>
>> wrote:
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> Hey Artem,
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > > >> Thanks for the KIP. I had a
>> question
>> > > about
>> > > > > > epoch
>> > > > > > > > > > > bumping.
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > > >> Previously when we send an
>> > > InitProducerId
>> > > > > > > request
>> > > > > > > > on
>> > > > > > > > > > > > > Producer
>> > > > > > > > > > > > > > > > > startup,
>> > > > > > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > >> bump the epoch and abort the
>> > > transaction.
>> > > > Is
>> > > > > > it
>> > > > > > > > > > correct
>> > > > > > > > > > > to
>> > > > > > > > > > > > > > > assume
>> > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > >> will still bump the epoch, but just
>> > not
>> > > > > abort
>> > > > > > > the
>> > > > > > > > > > > > > transaction?
>> > > > > > > > > > > > > > > > > > > >> If we still bump the epoch in this
>> > case,
>> > > > how
>> > > > > > > does
>> > > > > > > > > this
>> > > > > > > > > > > > > > interact
>> > > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > > >> KIP-890 where we also bump the
>> epoch
>> > on
>> > > > > every
>> > > > > > > > > > > transaction.
>> > > > > > > > > > > > > (I
>> > > > > > > > > > > > > > > > think
>> > > > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > >> means that we may skip epochs and
>> the
>> > > data
>> > > > > > > itself
>> > > > > > > > > will
>> > > > > > > > > > > all
>> > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > same
>> > > > > > > > > > > > > > > > > > > >> epoch)
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > > >> I may have follow ups depending on
>> the
>> > > > > answer
>> > > > > > to
>> > > > > > > > > this.
>> > > > > > > > > > > :)
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > > >> Thanks,
>> > > > > > > > > > > > > > > > > > > >> Justine
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM
>> Artem
>> > > > > Livshits
>> > > > > > > > > > > > > > > > > > > >> <alivsh...@confluent.io.invalid>
>> > wrote:
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > > >> > Hi Alex,
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > Thank you for your questions.
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > > the purpose of having
>> broker-level
>> > > > > > > > > > > > > > > > > > > transaction.two.phase.commit.enable
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > The thinking is that 2PC is a
>> bit of
>> > > an
>> > > > > > > advanced
>> > > > > > > > > > > > construct
>> > > > > > > > > > > > > > so
>> > > > > > > > > > > > > > > > > > enabling
>> > > > > > > > > > > > > > > > > > > >> 2PC
>> > > > > > > > > > > > > > > > > > > >> > in a Kafka cluster should be an
>> > > explicit
>> > > > > > > > decision.
>> > > > > > > > > > If
>> > > > > > > > > > > > it
>> > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > set
>> > > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > >> 'false'
>> > > > > > > > > > > > > > > > > > > >> > InitiProducerId (and
>> > initTransactions)
>> > > > > would
>> > > > > > > > > > > > > > > > > > > >> > return
>> > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > > WDYT about adding an
>> AdminClient
>> > > > method
>> > > > > > that
>> > > > > > > > > > returns
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > state
>> > > > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > > >> >
>> transaction.two.phase.commit.enable
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > I wonder if the client could just
>> > try
>> > > to
>> > > > > use
>> > > > > > > 2PC
>> > > > > > > > > and
>> > > > > > > > > > > > then
>> > > > > > > > > > > > > > > handle
>> > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> error
>> > > > > > > > > > > > > > > > > > > >> > (e.g. if it needs to fall back to
>> > > > ordinary
>> > > > > > > > > > > > transactions).
>> > > > > > > > > > > > > > > This
>> > > > > > > > > > > > > > > > > way
>> > > > > > > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > > > > >> > could uniformly handle cases when
>> > > Kafka
>> > > > > > > cluster
>> > > > > > > > > > > doesn't
>> > > > > > > > > > > > > > > support
>> > > > > > > > > > > > > > > > > 2PC
>> > > > > > > > > > > > > > > > > > > >> > completely and cases when 2PC is
>> > > > > restricted
>> > > > > > to
>> > > > > > > > > > certain
>> > > > > > > > > > > > > > users.
>> > > > > > > > > > > > > > > > We
>> > > > > > > > > > > > > > > > > > > could
>> > > > > > > > > > > > > > > > > > > >> > also expose this config in
>> > > > > describeConfigs,
>> > > > > > if
>> > > > > > > > the
>> > > > > > > > > > > > > fallback
>> > > > > > > > > > > > > > > > > approach
>> > > > > > > > > > > > > > > > > > > >> > doesn't work for some scenarios.
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > -Artem
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM
>> > > > Alexander
>> > > > > > > > > Sorokoumov
>> > > > > > > > > > > > > > > > > > > >> > <asorokou...@confluent.io
>> .invalid>
>> > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >> > > Hi Artem,
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> > > Thanks for publishing this KIP!
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> > > Can you please clarify the
>> purpose
>> > > of
>> > > > > > having
>> > > > > > > > > > > > > broker-level
>> > > > > > > > > > > > > > > > > > > >> > >
>> > transaction.two.phase.commit.enable
>> > > > > config
>> > > > > > > in
>> > > > > > > > > > > addition
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > new
>> > > > > > > > > > > > > > > > > > > >> ACL? If
>> > > > > > > > > > > > > > > > > > > >> > > the brokers are configured with
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > transaction.two.phase.commit.enable=false,
>> > > > > > > > > > > > > > > > > > > >> > > at what point will a client
>> > > configured
>> > > > > > with
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > transaction.two.phase.commit.enable=true
>> > > > > > > fail?
>> > > > > > > > > > Will
>> > > > > > > > > > > it
>> > > > > > > > > > > > > > > happen
>> > > > > > > > > > > > > > > > at
>> > > > > > > > > > > > > > > > > > > >> > > KafkaProducer#initTransactions?
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> > > WDYT about adding an
>> AdminClient
>> > > > method
>> > > > > > that
>> > > > > > > > > > returns
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > state
>> > > > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > t
>> > > > > > > > > > > > > > > > > > > >> > >
>> > ransaction.two.phase.commit.enable?
>> > > > This
>> > > > > > > way,
>> > > > > > > > > > > clients
>> > > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > know
>> > > > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > >> > advance
>> > > > > > > > > > > > > > > > > > > >> > > if 2PC is enabled on the
>> brokers.
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> > > Best,
>> > > > > > > > > > > > > > > > > > > >> > > Alex
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM
>> > > Roger
>> > > > > > > Hoover <
>> > > > > > > > > > > > > > > > > > > roger.hoo...@gmail.com>
>> > > > > > > > > > > > > > > > > > > >> > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> > > > Other than supporting
>> > multiplexing
>> > > > > > > > > transactional
>> > > > > > > > > > > > > streams
>> > > > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > > single
>> > > > > > > > > > > > > > > > > > > >> > > > producer, I don't see how to
>> > > improve
>> > > > > it.
>> > > > > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at
>> 12:12 PM
>> > > > Artem
>> > > > > > > > > Livshits
>> > > > > > > > > > > > > > > > > > > >> > > > <alivsh...@confluent.io
>> > .invalid>
>> > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > Hi Roger,
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > Thank you for summarizing
>> the
>> > > > > cons.  I
>> > > > > > > > agree
>> > > > > > > > > > and
>> > > > > > > > > > > > I'm
>> > > > > > > > > > > > > > > > curious
>> > > > > > > > > > > > > > > > > > > what
>> > > > > > > > > > > > > > > > > > > >> > would
>> > > > > > > > > > > > > > > > > > > >> > > > be
>> > > > > > > > > > > > > > > > > > > >> > > > > the alternatives to solve
>> > these
>> > > > > > problems
>> > > > > > > > > > better
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > if
>> > > > > > > > > > > > > > > > they
>> > > > > > > > > > > > > > > > > > can
>> > > > > > > > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > > > > >> > > > > incorporated into this
>> > proposal
>> > > > (or
>> > > > > > > built
>> > > > > > > > > > > > > > independently
>> > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > >> addition
>> > > > > > > > > > > > > > > > > > > >> > to
>> > > > > > > > > > > > > > > > > > > >> > > or
>> > > > > > > > > > > > > > > > > > > >> > > > > on top of this proposal).
>> > E.g.
>> > > > one
>> > > > > > > > > potential
>> > > > > > > > > > > > > > extension
>> > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > >> discussed
>> > > > > > > > > > > > > > > > > > > >> > > > > earlier in the thread
>> could be
>> > > > > > > > multiplexing
>> > > > > > > > > > > > logical
>> > > > > > > > > > > > > > > > > > > transactional
>> > > > > > > > > > > > > > > > > > > >> > > > "streams"
>> > > > > > > > > > > > > > > > > > > >> > > > > with a single producer.
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > -Artem
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at
>> > 4:50 PM
>> > > > > Roger
>> > > > > > > > > Hoover <
>> > > > > > > > > > > > > > > > > > > >> roger.hoo...@gmail.com
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > Thanks.  I like that
>> you're
>> > > > moving
>> > > > > > > Kafka
>> > > > > > > > > > > toward
>> > > > > > > > > > > > > > > > supporting
>> > > > > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > >> > > > > dual-write
>> > > > > > > > > > > > > > > > > > > >> > > > > > pattern.  Each use case
>> > needs
>> > > to
>> > > > > > > > consider
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > tradeoffs.
>> > > > > > > > > > > > > > > > > > You
>> > > > > > > > > > > > > > > > > > > >> > already
>> > > > > > > > > > > > > > > > > > > >> > > > > > summarized the pros very
>> > well
>> > > in
>> > > > > the
>> > > > > > > > > KIP.  I
>> > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > > summarize
>> > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > cons
>> > > > > > > > > > > > > > > > > > > >> > > > > > as follows:
>> > > > > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > - you sacrifice
>> > availability -
>> > > > > each
>> > > > > > > > write
>> > > > > > > > > > > > requires
>> > > > > > > > > > > > > > > both
>> > > > > > > > > > > > > > > > DB
>> > > > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > >> > Kafka
>> > > > > > > > > > > > > > > > > > > >> > > to
>> > > > > > > > > > > > > > > > > > > >> > > > > be
>> > > > > > > > > > > > > > > > > > > >> > > > > > available so I think your
>> > > > overall
>> > > > > > > > > > application
>> > > > > > > > > > > > > > > > availability
>> > > > > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > > > 1
>> > > > > > > > > > > > > > > > > > > >> -
>> > > > > > > > > > > > > > > > > > > >> > > p(DB
>> > > > > > > > > > > > > > > > > > > >> > > > is
>> > > > > > > > > > > > > > > > > > > >> > > > > > unavailable)*p(Kafka is
>> > > > > > unavailable).
>> > > > > > > > > > > > > > > > > > > >> > > > > > - latency will be higher
>> and
>> > > > > > > throughput
>> > > > > > > > > > lower
>> > > > > > > > > > > -
>> > > > > > > > > > > > > each
>> > > > > > > > > > > > > > > > write
>> > > > > > > > > > > > > > > > > > > >> requires
>> > > > > > > > > > > > > > > > > > > >> > > > both
>> > > > > > > > > > > > > > > > > > > >> > > > > > writes to DB and Kafka
>> while
>> > > > > holding
>> > > > > > > an
>> > > > > > > > > > > > exclusive
>> > > > > > > > > > > > > > lock
>> > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > DB.
>> > > > > > > > > > > > > > > > > > > >> > > > > > - you need to create a
>> > > producer
>> > > > > per
>> > > > > > > unit
>> > > > > > > > > of
>> > > > > > > > > > > > > > > concurrency
>> > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > your
>> > > > > > > > > > > > > > > > > > > >> app
>> > > > > > > > > > > > > > > > > > > >> > > > which
>> > > > > > > > > > > > > > > > > > > >> > > > > > has some overhead in the
>> app
>> > > and
>> > > > > > Kafka
>> > > > > > > > > side
>> > > > > > > > > > > > > (number
>> > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > > >> connections,
>> > > > > > > > > > > > > > > > > > > >> > > > poor
>> > > > > > > > > > > > > > > > > > > >> > > > > > batching).  I assume the
>> > > > producers
>> > > > > > > would
>> > > > > > > > > > need
>> > > > > > > > > > > to
>> > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > > > configured
>> > > > > > > > > > > > > > > > > > > >> for
>> > > > > > > > > > > > > > > > > > > >> > > low
>> > > > > > > > > > > > > > > > > > > >> > > > > > latency (linger.ms=0)
>> > > > > > > > > > > > > > > > > > > >> > > > > > - there's some
>> complexity in
>> > > > > > managing
>> > > > > > > > > stable
>> > > > > > > > > > > > > > > > transactional
>> > > > > > > > > > > > > > > > > > ids
>> > > > > > > > > > > > > > > > > > > >> for
>> > > > > > > > > > > > > > > > > > > >> > > each
>> > > > > > > > > > > > > > > > > > > >> > > > > > producer/concurrency
>> unit in
>> > > > your
>> > > > > > > > > > application.
>> > > > > > > > > > > > > With
>> > > > > > > > > > > > > > > k8s
>> > > > > > > > > > > > > > > > > > > >> > deployment,
>> > > > > > > > > > > > > > > > > > > >> > > > you
>> > > > > > > > > > > > > > > > > > > >> > > > > > may need to switch to
>> > > something
>> > > > > > like a
>> > > > > > > > > > > > StatefulSet
>> > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > gives
>> > > > > > > > > > > > > > > > > > > >> each
>> > > > > > > > > > > > > > > > > > > >> > > pod
>> > > > > > > > > > > > > > > > > > > >> > > > a
>> > > > > > > > > > > > > > > > > > > >> > > > > > stable identity across
>> > > restarts.
>> > > > > On
>> > > > > > > top
>> > > > > > > > > of
>> > > > > > > > > > > that
>> > > > > > > > > > > > > pod
>> > > > > > > > > > > > > > > > > > identity
>> > > > > > > > > > > > > > > > > > > >> which
>> > > > > > > > > > > > > > > > > > > >> > > you
>> > > > > > > > > > > > > > > > > > > >> > > > > can
>> > > > > > > > > > > > > > > > > > > >> > > > > > use as a prefix, you then
>> > > assign
>> > > > > > > unique
>> > > > > > > > > > > > > > transactional
>> > > > > > > > > > > > > > > > ids
>> > > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > >> each
>> > > > > > > > > > > > > > > > > > > >> > > > > > concurrency unit
>> > > > > (thread/goroutine).
>> > > > > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at
>> > > 12:53 PM
>> > > > > > Artem
>> > > > > > > > > > > Livshits
>> > > > > > > > > > > > > > > > > > > >> > > > > > <alivsh...@confluent.io
>> > > > .invalid>
>> > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > Hi Roger,
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > Thank you for the
>> > feedback.
>> > > > You
>> > > > > > > make
>> > > > > > > > a
>> > > > > > > > > > very
>> > > > > > > > > > > > > good
>> > > > > > > > > > > > > > > > point
>> > > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > >> we
>> > > > > > > > > > > > > > > > > > > >> > > also
>> > > > > > > > > > > > > > > > > > > >> > > > > > > discussed internally.
>> > > Adding
>> > > > > > > support
>> > > > > > > > > for
>> > > > > > > > > > > > > multiple
>> > > > > > > > > > > > > > > > > > > concurrent
>> > > > > > > > > > > > > > > > > > > >> > > > > > > transactions in one
>> > producer
>> > > > > could
>> > > > > > > be
>> > > > > > > > > > > valuable
>> > > > > > > > > > > > > but
>> > > > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > > > seems
>> > > > > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > >> > be a
>> > > > > > > > > > > > > > > > > > > >> > > > > > fairly
>> > > > > > > > > > > > > > > > > > > >> > > > > > > large and independent
>> > change
>> > > > > that
>> > > > > > > > would
>> > > > > > > > > > > > deserve
>> > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > separate
>> > > > > > > > > > > > > > > > > > > >> KIP.
>> > > > > > > > > > > > > > > > > > > >> > If
>> > > > > > > > > > > > > > > > > > > >> > > > > such
>> > > > > > > > > > > > > > > > > > > >> > > > > > > support is added we
>> could
>> > > > modify
>> > > > > > 2PC
>> > > > > > > > > > > > > functionality
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > >> incorporate
>> > > > > > > > > > > > > > > > > > > >> > > > that.
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > Maybe not too bad
>> but a
>> > > bit
>> > > > of
>> > > > > > > pain
>> > > > > > > > to
>> > > > > > > > > > > > manage
>> > > > > > > > > > > > > > > these
>> > > > > > > > > > > > > > > > > ids
>> > > > > > > > > > > > > > > > > > > >> inside
>> > > > > > > > > > > > > > > > > > > >> > > each
>> > > > > > > > > > > > > > > > > > > >> > > > > > > process and across all
>> > > > > application
>> > > > > > > > > > > processes.
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > I'm not sure if
>> supporting
>> > > > > > multiple
>> > > > > > > > > > > > transactions
>> > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > one
>> > > > > > > > > > > > > > > > > > > >> producer
>> > > > > > > > > > > > > > > > > > > >> > > > would
>> > > > > > > > > > > > > > > > > > > >> > > > > > make
>> > > > > > > > > > > > > > > > > > > >> > > > > > > id management simpler:
>> > we'd
>> > > > need
>> > > > > > to
>> > > > > > > > > store
>> > > > > > > > > > a
>> > > > > > > > > > > > > piece
>> > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > data
>> > > > > > > > > > > > > > > > > > > per
>> > > > > > > > > > > > > > > > > > > >> > > > > > transaction,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > so whether it's N
>> > producers
>> > > > > with a
>> > > > > > > > > single
>> > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > > or N
>> > > > > > > > > > > > > > > > > > > >> > > > transactions
>> > > > > > > > > > > > > > > > > > > >> > > > > > > with a single producer,
>> > it's
>> > > > > still
>> > > > > > > > > roughly
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > same
>> > > > > > > > > > > > > > > > > amount
>> > > > > > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > > >> > data
>> > > > > > > > > > > > > > > > > > > >> > > to
>> > > > > > > > > > > > > > > > > > > >> > > > > > > manage.  In fact,
>> managing
>> > > > > > > > transactional
>> > > > > > > > > > ids
>> > > > > > > > > > > > > > > (current
>> > > > > > > > > > > > > > > > > > > >> proposal)
>> > > > > > > > > > > > > > > > > > > >> > > might
>> > > > > > > > > > > > > > > > > > > >> > > > > be
>> > > > > > > > > > > > > > > > > > > >> > > > > > > easier, because the id
>> is
>> > > > > > controlled
>> > > > > > > > by
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > > application
>> > > > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > > > > >> > > knows
>> > > > > > > > > > > > > > > > > > > >> > > > > how
>> > > > > > > > > > > > > > > > > > > >> > > > > > to
>> > > > > > > > > > > > > > > > > > > >> > > > > > > complete the
>> transaction
>> > > after
>> > > > > > > crash /
>> > > > > > > > > > > > restart;
>> > > > > > > > > > > > > > > while
>> > > > > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > TID
>> > > > > > > > > > > > > > > > > > > >> would
>> > > > > > > > > > > > > > > > > > > >> > > be
>> > > > > > > > > > > > > > > > > > > >> > > > > > > generated by Kafka and
>> > that
>> > > > > would
>> > > > > > > > > create a
>> > > > > > > > > > > > > > question
>> > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > > >> starting
>> > > > > > > > > > > > > > > > > > > >> > > Kafka
>> > > > > > > > > > > > > > > > > > > >> > > > > > > transaction, but not
>> > saving
>> > > > its
>> > > > > > TID
>> > > > > > > > and
>> > > > > > > > > > then
>> > > > > > > > > > > > > > > crashing,
>> > > > > > > > > > > > > > > > > > then
>> > > > > > > > > > > > > > > > > > > >> > > figuring
>> > > > > > > > > > > > > > > > > > > >> > > > > out
>> > > > > > > > > > > > > > > > > > > >> > > > > > > which transactions to
>> > abort
>> > > > and
>> > > > > > etc.
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a
>> separate
>> > > > > producer
>> > > > > > > for
>> > > > > > > > > each
>> > > > > > > > > > > > > > > concurrency
>> > > > > > > > > > > > > > > > > > slot
>> > > > > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > >> > the
>> > > > > > > > > > > > > > > > > > > >> > > > > > > application
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > This is a very valid
>> > > concern.
>> > > > > > Maybe
>> > > > > > > > > we'd
>> > > > > > > > > > > need
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > > > > >> > > > > multiplexing
>> > > > > > > > > > > > > > > > > > > >> > > > > > of
>> > > > > > > > > > > > > > > > > > > >> > > > > > > transactional logical
>> > > > "streams"
>> > > > > > over
>> > > > > > > > the
>> > > > > > > > > > > same
>> > > > > > > > > > > > > > > > > connection.
>> > > > > > > > > > > > > > > > > > > >> Seems
>> > > > > > > > > > > > > > > > > > > >> > > > like a
>> > > > > > > > > > > > > > > > > > > >> > > > > > > separate KIP, though.
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems
>> > you're
>> > > > > left
>> > > > > > > with
>> > > > > > > > > > > > > > > single-threaded
>> > > > > > > > > > > > > > > > > > model
>> > > > > > > > > > > > > > > > > > > >> per
>> > > > > > > > > > > > > > > > > > > >> > > > > > > application process?
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > That's a fair
>> assessment.
>> > > Not
>> > > > > > > > > necessarily
>> > > > > > > > > > > > > exactly
>> > > > > > > > > > > > > > > > > > > >> > single-threaded
>> > > > > > > > > > > > > > > > > > > >> > > > per
>> > > > > > > > > > > > > > > > > > > >> > > > > > > application, but a
>> single
>> > > > > producer
>> > > > > > > per
>> > > > > > > > > > > thread
>> > > > > > > > > > > > > > model
>> > > > > > > > > > > > > > > > > (i.e.
>> > > > > > > > > > > > > > > > > > an
>> > > > > > > > > > > > > > > > > > > >> > > > > application
>> > > > > > > > > > > > > > > > > > > >> > > > > > > could have a pool of
>> > > threads +
>> > > > > > > > producers
>> > > > > > > > > > to
>> > > > > > > > > > > > > > increase
>> > > > > > > > > > > > > > > > > > > >> > concurrency).
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > -Artem
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at
>> > > > 7:22 PM
>> > > > > > > Roger
>> > > > > > > > > > > Hoover <
>> > > > > > > > > > > > > > > > > > > >> > > roger.hoo...@gmail.com
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > Artem,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the reply.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > If I understand
>> > correctly,
>> > > > > Kafka
>> > > > > > > > does
>> > > > > > > > > > not
>> > > > > > > > > > > > > > support
>> > > > > > > > > > > > > > > > > > > concurrent
>> > > > > > > > > > > > > > > > > > > >> > > > > > transactions
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > from the same
>> producer
>> > > > > > > > (transactional
>> > > > > > > > > > id).
>> > > > > > > > > > > > I
>> > > > > > > > > > > > > > > think
>> > > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > >> means
>> > > > > > > > > > > > > > > > > > > >> > > that
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > applications that
>> want
>> > to
>> > > > > > support
>> > > > > > > > > > > in-process
>> > > > > > > > > > > > > > > > > concurrency
>> > > > > > > > > > > > > > > > > > > >> (say
>> > > > > > > > > > > > > > > > > > > >> > > > > > > thread-level
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > concurrency with
>> > row-level
>> > > > DB
>> > > > > > > > locking)
>> > > > > > > > > > > would
>> > > > > > > > > > > > > > need
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > manage
>> > > > > > > > > > > > > > > > > > > >> > > > separate
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > transactional ids and
>> > > > > producers
>> > > > > > > per
>> > > > > > > > > > thread
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > > > then
>> > > > > > > > > > > > > > > > > > store
>> > > > > > > > > > > > > > > > > > > >> txn
>> > > > > > > > > > > > > > > > > > > >> > > state
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > accordingly.   The
>> > > potential
>> > > > > > > > usability
>> > > > > > > > > > > > > > downsides I
>> > > > > > > > > > > > > > > > see
>> > > > > > > > > > > > > > > > > > are
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > 1) managing a set of
>> > > > > > transactional
>> > > > > > > > ids
>> > > > > > > > > > for
>> > > > > > > > > > > > > each
>> > > > > > > > > > > > > > > > > > > application
>> > > > > > > > > > > > > > > > > > > >> > > process
>> > > > > > > > > > > > > > > > > > > >> > > > > > that
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > scales up to it's max
>> > > > > > concurrency.
>> > > > > > > > > > Maybe
>> > > > > > > > > > > > not
>> > > > > > > > > > > > > > too
>> > > > > > > > > > > > > > > > bad
>> > > > > > > > > > > > > > > > > > but
>> > > > > > > > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > > >> bit
>> > > > > > > > > > > > > > > > > > > >> > > of
>> > > > > > > > > > > > > > > > > > > >> > > > > pain
>> > > > > > > > > > > > > > > > > > > >> > > > > > > to
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > manage these ids
>> inside
>> > > each
>> > > > > > > process
>> > > > > > > > > and
>> > > > > > > > > > > > > across
>> > > > > > > > > > > > > > > all
>> > > > > > > > > > > > > > > > > > > >> application
>> > > > > > > > > > > > > > > > > > > >> > > > > > > processes.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a
>> separate
>> > > > > producer
>> > > > > > > for
>> > > > > > > > > each
>> > > > > > > > > > > > > > > concurrency
>> > > > > > > > > > > > > > > > > > slot
>> > > > > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > >> > the
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > application - this
>> could
>> > > > > create
>> > > > > > a
>> > > > > > > > lot
>> > > > > > > > > > more
>> > > > > > > > > > > > > > > producers
>> > > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > >> > > resultant
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > connections to Kafka
>> > than
>> > > > the
>> > > > > > > > typical
>> > > > > > > > > > > model
>> > > > > > > > > > > > > of a
>> > > > > > > > > > > > > > > > > single
>> > > > > > > > > > > > > > > > > > > >> > producer
>> > > > > > > > > > > > > > > > > > > >> > > > per
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > process.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems
>> > you're
>> > > > > left
>> > > > > > > with
>> > > > > > > > > > > > > > > single-threaded
>> > > > > > > > > > > > > > > > > > model
>> > > > > > > > > > > > > > > > > > > >> per
>> > > > > > > > > > > > > > > > > > > >> > > > > > > application
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > process?
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > Roger
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023
>> at
>> > > > > 5:11 PM
>> > > > > > > > Artem
>> > > > > > > > > > > > Livshits
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > <
>> alivsh...@confluent.io
>> > > > > > .invalid>
>> > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > Thank you for the
>> > > > questions.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > It looks like the
>> > > > > > application
>> > > > > > > > must
>> > > > > > > > > > > have
>> > > > > > > > > > > > > > stable
>> > > > > > > > > > > > > > > > > > > >> > transactional
>> > > > > > > > > > > > > > > > > > > >> > > > ids
>> > > > > > > > > > > > > > > > > > > >> > > > > > over
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > time?
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > The transactional
>> id
>> > > > should
>> > > > > > > > uniquely
>> > > > > > > > > > > > > identify
>> > > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > producer
>> > > > > > > > > > > > > > > > > > > >> > > instance
>> > > > > > > > > > > > > > > > > > > >> > > > > and
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > needs
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > to be stable across
>> > the
>> > > > > > > restarts.
>> > > > > > > > > If
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > transactional
>> > > > > > > > > > > > > > > > > > > >> id is
>> > > > > > > > > > > > > > > > > > > >> > > not
>> > > > > > > > > > > > > > > > > > > >> > > > > > > stable
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > across restarts,
>> then
>> > > > zombie
>> > > > > > > > > messages
>> > > > > > > > > > > > from a
>> > > > > > > > > > > > > > > > > previous
>> > > > > > > > > > > > > > > > > > > >> > > incarnation
>> > > > > > > > > > > > > > > > > > > >> > > > > of
>> > > > > > > > > > > > > > > > > > > >> > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > producer may
>> violate
>> > > > > > atomicity.
>> > > > > > > > If
>> > > > > > > > > > > there
>> > > > > > > > > > > > > are
>> > > > > > > > > > > > > > 2
>> > > > > > > > > > > > > > > > > > producer
>> > > > > > > > > > > > > > > > > > > >> > > > instances
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > concurrently
>> producing
>> > > > data
>> > > > > > with
>> > > > > > > > the
>> > > > > > > > > > > same
>> > > > > > > > > > > > > > > > > > transactional
>> > > > > > > > > > > > > > > > > > > >> id,
>> > > > > > > > > > > > > > > > > > > >> > > they
>> > > > > > > > > > > > > > > > > > > >> > > > > are
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > going
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > to constantly fence
>> > each
>> > > > > other
>> > > > > > > and
>> > > > > > > > > > most
>> > > > > > > > > > > > > likely
>> > > > > > > > > > > > > > > > make
>> > > > > > > > > > > > > > > > > > > >> little or
>> > > > > > > > > > > > > > > > > > > >> > > no
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > progress.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > The name might be a
>> > > little
>> > > > > bit
>> > > > > > > > > > confusing
>> > > > > > > > > > > > as
>> > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > may
>> > > > > > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > > > > >> > mistaken
>> > > > > > > > > > > > > > > > > > > >> > > > for
>> > > > > > > > > > > > > > > > > > > >> > > > > a
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > transaction id /
>> TID
>> > > that
>> > > > > > > uniquely
>> > > > > > > > > > > > > identifies
>> > > > > > > > > > > > > > > > every
>> > > > > > > > > > > > > > > > > > > >> > > transaction.
>> > > > > > > > > > > > > > > > > > > >> > > > > The
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > name
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > and the semantics
>> were
>> > > > > defined
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > > > > original
>> > > > > > > > > > > > > > > > > > > >> > > > > exactly-once-semantics
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > (EoS)
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > proposal (
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > )
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > and KIP-939 just
>> build
>> > > on
>> > > > > top
>> > > > > > of
>> > > > > > > > > that.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > I'm curious to
>> > > > understand
>> > > > > > what
>> > > > > > > > > > happens
>> > > > > > > > > > > > if
>> > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > producer
>> > > > > > > > > > > > > > > > > > > >> > dies,
>> > > > > > > > > > > > > > > > > > > >> > > > and
>> > > > > > > > > > > > > > > > > > > >> > > > > > does
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > not
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > come up and recover
>> > the
>> > > > > > pending
>> > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > within
>> > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > > transaction
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > timeout
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > interval.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > If the producer /
>> > > > > application
>> > > > > > > > never
>> > > > > > > > > > > comes
>> > > > > > > > > > > > > > back,
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > transaction
>> > > > > > > > > > > > > > > > > > > >> > > > > will
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > remain
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > in prepared (a.k.a.
>> > > > > > "in-doubt")
>> > > > > > > > > state
>> > > > > > > > > > > > until
>> > > > > > > > > > > > > an
>> > > > > > > > > > > > > > > > > > operator
>> > > > > > > > > > > > > > > > > > > >> > > > forcefully
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > terminates the
>> > > > transaction.
>> > > > > > > > That's
>> > > > > > > > > > why
>> > > > > > > > > > > > > there
>> > > > > > > > > > > > > > > is a
>> > > > > > > > > > > > > > > > > new
>> > > > > > > > > > > > > > > > > > > >> ACL is
>> > > > > > > > > > > > > > > > > > > >> > > > > defined
>> > > > > > > > > > > > > > > > > > > >> > > > > > > in
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > this proposal --
>> this
>> > > > > > > > functionality
>> > > > > > > > > > > should
>> > > > > > > > > > > > > > only
>> > > > > > > > > > > > > > > > > > provided
>> > > > > > > > > > > > > > > > > > > >> to
>> > > > > > > > > > > > > > > > > > > >> > > > > > > applications
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > that implement
>> proper
>> > > > > recovery
>> > > > > > > > > logic.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > -Artem
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22,
>> 2023
>> > at
>> > > > > > 12:52 AM
>> > > > > > > > > Arjun
>> > > > > > > > > > > > > Satish
>> > > > > > > > > > > > > > <
>> > > > > > > > > > > > > > > > > > > >> > > > > > arjun.sat...@gmail.com>
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Hello Artem,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the
>> KIP.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > I have the same
>> > > question
>> > > > > as
>> > > > > > > > Roger
>> > > > > > > > > on
>> > > > > > > > > > > > > > > concurrent
>> > > > > > > > > > > > > > > > > > > writes,
>> > > > > > > > > > > > > > > > > > > >> and
>> > > > > > > > > > > > > > > > > > > >> > > an
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > additional
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > one on consumer
>> > > > behavior.
>> > > > > > > > > Typically,
>> > > > > > > > > > > > > > > > transactions
>> > > > > > > > > > > > > > > > > > will
>> > > > > > > > > > > > > > > > > > > >> > > timeout
>> > > > > > > > > > > > > > > > > > > >> > > > if
>> > > > > > > > > > > > > > > > > > > >> > > > > > not
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > committed within
>> > some
>> > > > time
>> > > > > > > > > interval.
>> > > > > > > > > > > > With
>> > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > proposed
>> > > > > > > > > > > > > > > > > > > >> > > changes
>> > > > > > > > > > > > > > > > > > > >> > > > in
>> > > > > > > > > > > > > > > > > > > >> > > > > > > this
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > KIP,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > consumers cannot
>> > > consume
>> > > > > > past
>> > > > > > > > the
>> > > > > > > > > > > > ongoing
>> > > > > > > > > > > > > > > > > > transaction.
>> > > > > > > > > > > > > > > > > > > >> I'm
>> > > > > > > > > > > > > > > > > > > >> > > > > curious
>> > > > > > > > > > > > > > > > > > > >> > > > > > to
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > understand what
>> > > happens
>> > > > if
>> > > > > > the
>> > > > > > > > > > > producer
>> > > > > > > > > > > > > > dies,
>> > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > does
>> > > > > > > > > > > > > > > > > > > >> not
>> > > > > > > > > > > > > > > > > > > >> > > come
>> > > > > > > > > > > > > > > > > > > >> > > > > up
>> > > > > > > > > > > > > > > > > > > >> > > > > > > and
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > recover the
>> pending
>> > > > > > > transaction
>> > > > > > > > > > within
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > > > > >> > > timeout
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > interval.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > Or
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > are we saying
>> that
>> > > when
>> > > > > used
>> > > > > > > in
>> > > > > > > > > this
>> > > > > > > > > > > 2PC
>> > > > > > > > > > > > > > > > context,
>> > > > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > >> should
>> > > > > > > > > > > > > > > > > > > >> > > > > > configure
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > these
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > transaction
>> timeouts
>> > > to
>> > > > > very
>> > > > > > > > large
>> > > > > > > > > > > > > > durations?
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks in
>> advance!
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Arjun
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21,
>> 2023
>> > > at
>> > > > > > > 1:06 PM
>> > > > > > > > > > Roger
>> > > > > > > > > > > > > > Hoover <
>> > > > > > > > > > > > > > > > > > > >> > > > > > roger.hoo...@gmail.com
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Hi Artem,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for
>> writing
>> > > > this
>> > > > > > KIP.
>> > > > > > > > > Can
>> > > > > > > > > > > you
>> > > > > > > > > > > > > > > clarify
>> > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > > requirements
>> > > > > > > > > > > > > > > > > > > >> > > > > a
>> > > > > > > > > > > > > > > > > > > >> > > > > > > bit
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > more
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > for managing
>> > > > transaction
>> > > > > > > > state?
>> > > > > > > > > > It
>> > > > > > > > > > > > > looks
>> > > > > > > > > > > > > > > like
>> > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > > application
>> > > > > > > > > > > > > > > > > > > >> > > > > > must
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > have
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > stable
>> > transactional
>> > > > ids
>> > > > > > > over
>> > > > > > > > > > time?
>> > > > > > > > > > > > >  What
>> > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > granularity
>> > > > > > > > > > > > > > > > > > > >> > > > > of
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > those
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > ids
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > and producers?
>> > Say
>> > > > the
>> > > > > > > > > > application
>> > > > > > > > > > > > is a
>> > > > > > > > > > > > > > > > > > > >> multi-threaded
>> > > > > > > > > > > > > > > > > > > >> > > Java
>> > > > > > > > > > > > > > > > > > > >> > > > > web
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > server,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > can/should all
>> the
>> > > > > > > concurrent
>> > > > > > > > > > > threads
>> > > > > > > > > > > > > > share
>> > > > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > > >> > transactional
>> > > > > > > > > > > > > > > > > > > >> > > > id
>> > > > > > > > > > > > > > > > > > > >> > > > > > and
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > producer?  That
>> > > > doesn't
>> > > > > > seem
>> > > > > > > > > right
>> > > > > > > > > > > to
>> > > > > > > > > > > > me
>> > > > > > > > > > > > > > > > unless
>> > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > > application
>> > > > > > > > > > > > > > > > > > > >> > > > > > is
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > using
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > global DB locks
>> > that
>> > > > > > > serialize
>> > > > > > > > > all
>> > > > > > > > > > > > > > requests.
>> > > > > > > > > > > > > > > > > > > >> Instead, if
>> > > > > > > > > > > > > > > > > > > >> > > the
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > application
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > uses row-level
>> DB
>> > > > locks,
>> > > > > > > there
>> > > > > > > > > > could
>> > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > > multiple,
>> > > > > > > > > > > > > > > > > > > >> > > concurrent,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > independent
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > txns happening
>> in
>> > > the
>> > > > > same
>> > > > > > > JVM
>> > > > > > > > > so
>> > > > > > > > > > it
>> > > > > > > > > > > > > seems
>> > > > > > > > > > > > > > > > like
>> > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > > granularity
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > managing
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > transactional
>> ids
>> > > and
>> > > > > txn
>> > > > > > > > state
>> > > > > > > > > > > needs
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > line
>> > > > > > > > > > > > > > > > up
>> > > > > > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > > >> > > > > granularity
>> > > > > > > > > > > > > > > > > > > >> > > > > > > of
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > DB
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > locking.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Does that make
>> > sense
>> > > > or
>> > > > > > am I
>> > > > > > > > > > > > > > > misunderstanding?
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Roger
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16,
>> > 2023
>> > > > at
>> > > > > > > > 11:40 PM
>> > > > > > > > > > > Artem
>> > > > > > > > > > > > > > > > Livshits
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > <
>> > > > alivsh...@confluent.io
>> > > > > > > > > .invalid>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hello,
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > This is a
>> > > discussion
>> > > > > > > thread
>> > > > > > > > > for
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > .
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The KIP
>> proposes
>> > > > > > extending
>> > > > > > > > > Kafka
>> > > > > > > > > > > > > > > transaction
>> > > > > > > > > > > > > > > > > > > support
>> > > > > > > > > > > > > > > > > > > >> > > (that
>> > > > > > > > > > > > > > > > > > > >> > > > > > > already
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > uses
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2PC
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > under the
>> hood)
>> > to
>> > > > > > enable
>> > > > > > > > > > > atomicity
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > > dual
>> > > > > > > > > > > > > > > > > > writes
>> > > > > > > > > > > > > > > > > > > >> to
>> > > > > > > > > > > > > > > > > > > >> > > Kafka
>> > > > > > > > > > > > > > > > > > > >> > > > > and
>> > > > > > > > > > > > > > > > > > > >> > > > > > > an
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > external
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > database, and
>> > > helps
>> > > > to
>> > > > > > > fix a
>> > > > > > > > > > long
>> > > > > > > > > > > > > > standing
>> > > > > > > > > > > > > > > > > Flink
>> > > > > > > > > > > > > > > > > > > >> issue.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > An example of
>> > code
>> > > > > that
>> > > > > > > uses
>> > > > > > > > > the
>> > > > > > > > > > > > dual
>> > > > > > > > > > > > > > > write
>> > > > > > > > > > > > > > > > > > recipe
>> > > > > > > > > > > > > > > > > > > >> with
>> > > > > > > > > > > > > > > > > > > >> > > > JDBC
>> > > > > > > > > > > > > > > > > > > >> > > > > > and
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > should
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > work for most
>> > SQL
>> > > > > > > databases
>> > > > > > > > is
>> > > > > > > > > > > here
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231.
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for
>> the
>> > > > > sister
>> > > > > > > fix
>> > > > > > > > in
>> > > > > > > > > > > Flink
>> > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > here
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > -Artem
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to