Hi Jun,

>  Then, should we change the following in the example to use
InitProducerId(true) instead?

We could. I just thought that it's good to make the example self-contained
by starting from a clean state.

> Also, could Flink just follow the dual-write recipe?

I think it would bring some unnecessary logic to Flink (or any other system
that already has a transaction coordinator and just wants to drive Kafka to
the desired state).  We could discuss it with Flink folks, the current
proposal was developed in collaboration with them.

> 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
Integer.MAX_VALUE?

The server would reject this for regular transactions, it only accepts
values that are <= *transaction.max.timeout.ms
<http://transaction.max.timeout.ms> *(a broker config).

> 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
request to use the ongoing pid. ...

Without 2PC there is no case where the pid could change between starting a
transaction and endTxn (InitProducerId would abort any ongoing
transaction).  WIth 2PC there is now a case where there could be
InitProducerId that can change the pid without aborting the transaction, so
we need to handle that.  I wouldn't say that the flow is different, but
it's rather extended to handle new cases.  The main principle is still the
same -- for all operations we use the latest "operational" pid and epoch
known to the client, this way we guarantee that we can fence zombie / split
brain clients by disrupting the "latest known" pid + epoch progression.

> 25. "We send out markers using the original ongoing transaction
ProducerId and ProducerEpoch" ...

Updated.

-Artem

On Mon, Jan 29, 2024 at 4:57 PM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20. So for the dual-write recipe, we should always call
> InitProducerId(keepPreparedTxn=true) from the producer? Then, should we
> change the following in the example to use InitProducerId(true) instead?
> 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> Also, could Flink just follow the dual-write recipe? It's simpler if there
> is one way to solve the 2pc issue.
>
> 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> Integer.MAX_VALUE?
>
> 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
> request to use the ongoing pid. With 2pc, the coordinator now expects the
> endTxn request to use the next pid. So, the flow is different, right?
>
> 25. "We send out markers using the original ongoing transaction ProducerId
> and ProducerEpoch"
> We should use ProducerEpoch + 1 in the marker, right?
>
> Jun
>
> On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > > 20.  I am a bit confused by how we set keepPreparedTxn.  ...
> >
> > keepPreparedTxn=true informs the transaction coordinator that it should
> > keep the ongoing transaction, if any.  If the keepPreparedTxn=false, then
> > any ongoing transaction is aborted (this is exactly the current
> behavior).
> > enable2Pc is a separate argument that is controlled by the
> > *transaction.two.phase.commit.enable *setting on the client.
> >
> > To start 2PC, the client just needs to set
> > *transaction.two.phase.commit.enable*=true in the config.  Then if the
> > client knows the status of the transaction upfront (in the case of Flink,
> > Flink keeps the knowledge if the transaction is prepared in its own
> store,
> > so it always knows upfront), it can set keepPreparedTxn accordingly, then
> > if the transaction was prepared, it'll be ready for the client to
> complete
> > the appropriate action; if the client doesn't have a knowledge that the
> > transaction is prepared, keepPreparedTxn is going to be false, in which
> > case we'll get to a clean state (the same way we do today).
> >
> > For the dual-write recipe, the client doesn't know upfront if the
> > transaction is prepared, this information is implicitly encoded
> > PreparedTxnState value that can be used to resolve the transaction state.
> > In that case, keepPreparedTxn should always be true, because we don't
> know
> > upfront and we don't want to accidentally abort a committed transaction.
> >
> > The forceTerminateTransaction call can just use keepPreparedTxn=false, it
> > actually doesn't matter if it sets Enable2Pc flag.
> >
> > > 21. TransactionLogValue: Do we need some field to identify whether this
> > is written for 2PC so that ongoing txn is never auto aborted?
> >
> > The TransactionTimeoutMs would be set to Integer.MAX_VALUE if 2PC was
> > enabled.  I've added a note to the KIP about this.
> >
> > > 22
> >
> > You're right it's a typo.  I fixed it as well as step 9 (REQUEST:
> > ProducerId=73, ProducerEpoch=MAX).
> >
> > > 23. It's a bit weird that Enable2Pc is driven by a config while
> > KeepPreparedTxn is from an API param ...
> >
> > The intent to use 2PC doesn't change from transaction to transaction, but
> > the intent to keep prepared txn may change from transaction to
> > transaction.  In dual-write recipes the distinction is not clear, but for
> > use cases where keepPreparedTxn value is known upfront (e.g. Flink) it's
> > more prominent.  E.g. a Flink's Kafka sink operator could be deployed
> with
> > *transaction.two.phase.commit.enable*=true hardcoded in the image, but
> > keepPreparedTxn cannot be hardcoded in the image, because it depends on
> the
> > job manager's state.
> >
> > > 24
> >
> > The flow is actually going to be the same way as it is now -- the "main"
> > producer id + epoch needs to be used in all operations to prevent fencing
> > (it's sort of a common "header" in all RPC calls that follow the same
> > rules).  The ongoing txn info is just additional info for making a
> commit /
> > abort decision based on the PreparedTxnState from the DB.
> >
> > --Artem
> >
> > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply. A few more comments.
> > >
> > > 20. I am a bit confused by how we set keepPreparedTxn. From the KIP, I
> > got
> > > the following (1) to start 2pc, we call
> > > InitProducerId(keepPreparedTxn=false); (2) when the producer fails and
> > > needs to do recovery, it calls InitProducerId(keepPreparedTxn=true);
> (3)
> > > Admin.forceTerminateTransaction() calls
> > > InitProducerId(keepPreparedTxn=false).
> > > 20.1 In (1), when a producer calls InitProducerId(false) with 2pc
> > enabled,
> > > and there is an ongoing txn, should the server return an error to the
> > > InitProducerId request? If so, what would be the error code?
> > > 20.2 How do we distinguish between (1) and (3)? It's the same API call
> > but
> > > (1) doesn't abort ongoing txn and (2) does.
> > > 20.3 The usage in (1) seems unintuitive. 2pc implies keeping the
> ongoing
> > > txn. So, setting keepPreparedTxn to false to start 2pc seems counter
> > > intuitive.
> > >
> > > 21. TransactionLogValue: Do we need some field to identify whether this
> > is
> > > written for 2PC so that ongoing txn is never auto aborted?
> > >
> > > 22. "8. InitProducerId(true); TC STATE: Ongoing, ProducerId=42,
> > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73,
> > > NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1,
> > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
> > > It seems in the above example, Epoch in RESPONSE should be MAX to match
> > > NextProducerEpoch?
> > >
> > > 23. It's a bit weird that Enable2Pc is driven by a config
> > > while KeepPreparedTxn is from an API param. Should we make them more
> > > consistent since they seem related?
> > >
> > > 24. "9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1; TC STATE:
> > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73,
> > > NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85,
> Epoch=0,
> > > When a commit request is sent, it uses the latest ProducerId and
> > > ProducerEpoch."
> > > The step where we use the next produceId to commit an old txn works,
> but
> > > can be confusing. It's going to be hard for people implementing this
> new
> > > client protocol to figure out when to use the current or the new
> > producerId
> > > in the EndTxnRequest. One potential way to improve this is to extend
> > > EndTxnRequest with a new field like expectedNextProducerId. Then we can
> > > always use the old produceId in the existing field, but set
> > > expectedNextProducerId to bypass the fencing logic when needed.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thank you for the comments.
> > > >
> > > > > 10. For the two new fields in Enable2Pc and KeepPreparedTxn ...
> > > >
> > > > I added a note that all combinations are valid.  Enable2Pc=false &
> > > > KeepPreparedTxn=true could be potentially useful for backward
> > > compatibility
> > > > with Flink, when the new version of Flink that implements KIP-319
> tries
> > > to
> > > > work with a cluster that doesn't authorize 2PC.
> > > >
> > > > > 11.  InitProducerIdResponse: If there is no ongoing txn, what will
> > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > >
> > > > I added a note that they will be set to -1.  The client then will
> know
> > > that
> > > > there is no ongoing txn and .completeTransaction becomes a no-op (but
> > > still
> > > > required before .send is enabled).
> > > >
> > > > > 12. ListTransactionsRequest related changes: It seems those are
> > already
> > > > covered by KIP-994?
> > > >
> > > > Removed from this KIP.
> > > >
> > > > > 13. TransactionalLogValue ...
> > > >
> > > > This is now updated to work on top of KIP-890.
> > > >
> > > > > 14. "Note that the (producerId, epoch) pair that corresponds to the
> > > > ongoing transaction ...
> > > >
> > > > This is now updated to work on top of KIP-890.
> > > >
> > > > > 15. active-transaction-total-time-max : ...
> > > >
> > > > Updated.
> > > >
> > > > > 16. "transaction.two.phase.commit.enable The default would be
> > ‘false’.
> > > > If it’s ‘false’, 2PC functionality is disabled even if the ACL is set
> > ...
> > > >
> > > > Disabling 2PC effectively removes all authorization to use it, hence
> I
> > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be appropriate.
> > > >
> > > > Do you suggest using a different error code for 2PC authorization vs
> > some
> > > > other authorization (e.g. TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
> > or a
> > > > different code for disabled vs. unauthorised (e.g.
> > > > TWO_PHASE_COMMIT_DISABLED) or both?
> > > >
> > > > > 17. completeTransaction(). We expect this to be only used during
> > > > recovery.
> > > >
> > > > It can also be used if, say, a commit to the database fails and the
> > > result
> > > > is inconclusive, e.g.
> > > >
> > > > 1. Begin DB transaction
> > > > 2. Begin Kafka transaction
> > > > 3. Prepare Kafka transaction
> > > > 4. Commit DB transaction
> > > > 5. The DB commit fails, figure out the state of the transaction by
> > > reading
> > > > the PreparedTxnState from DB
> > > > 6. Complete Kafka transaction with the PreparedTxnState.
> > > >
> > > > > 18. "either prepareTransaction was called or initTransaction(true)
> > was
> > > > called": "either" should be "neither"?
> > > >
> > > > Updated.
> > > >
> > > > > 19. Since InitProducerId always bumps up the epoch, it creates a
> > > > situation ...
> > > >
> > > > InitProducerId only bumps the producer epoch, the ongoing transaction
> > > epoch
> > > > stays the same, no matter how many times the InitProducerId is called
> > > > before the transaction is completed.  Eventually the epoch may
> > overflow,
> > > > and then a new producer id would be allocated, but the ongoing
> > > transaction
> > > > producer id would stay the same.
> > > >
> > > > I've added a couple examples in the KIP (
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
> > > > )
> > > > that walk through some scenarios and show how the state is changed.
> > > >
> > > > -Artem
> > > >
> > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the KIP. A few comments below.
> > > > >
> > > > > 10. For the two new fields in Enable2Pc and KeepPreparedTxn in
> > > > > InitProducerId, it would be useful to document a bit more detail on
> > > what
> > > > > values are set under what cases. For example, are all four
> > combinations
> > > > > valid?
> > > > >
> > > > > 11.  InitProducerIdResponse: If there is no ongoing txn, what will
> > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > >
> > > > > 12. ListTransactionsRequest related changes: It seems those are
> > already
> > > > > covered by KIP-994?
> > > > >
> > > > > 13. TransactionalLogValue: Could we name TransactionProducerId and
> > > > > ProducerId better? It's not clear from the name which is for which.
> > > > >
> > > > > 14. "Note that the (producerId, epoch) pair that corresponds to the
> > > > ongoing
> > > > > transaction is going to be written instead of the existing
> ProducerId
> > > and
> > > > > ProducerEpoch fields (which are renamed to reflect the semantics)
> to
> > > > > support downgrade.": I am a bit confused on that. Are we writing
> > > > different
> > > > > values to the existing fields? Then, we can't downgrade, right?
> > > > >
> > > > > 15. active-transaction-total-time-max : Would
> > > > > active-transaction-open-time-max be more intuitive? Also, could we
> > > > include
> > > > > the full name (group, tags, etc)?
> > > > >
> > > > > 16. "transaction.two.phase.commit.enable The default would be
> > ‘false’.
> > > > If
> > > > > it’s ‘false’, 2PC functionality is disabled even if the ACL is set,
> > > > clients
> > > > > that attempt to use this functionality would receive
> > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive for the
> > client
> > > to
> > > > > understand what the actual cause is.
> > > > >
> > > > > 17. completeTransaction(). We expect this to be only used during
> > > > recovery.
> > > > > Could we document this clearly? Could we prevent it from being used
> > > > > incorrectly (e.g. throw an exception if the producer has called
> other
> > > > > methods like send())?
> > > > >
> > > > > 18. "either prepareTransaction was called or initTransaction(true)
> > was
> > > > > called": "either" should be "neither"?
> > > > >
> > > > > 19. Since InitProducerId always bumps up the epoch, it creates a
> > > > situation
> > > > > where there could be multiple outstanding txns. The following is an
> > > > example
> > > > > of a potential problem during recovery.
> > > > >    The last txn epoch in the external store is 41 when the app
> dies.
> > > > >    Instance1 is created for recovery.
> > > > >      1. (instance1) InitProducerId(keepPreparedTxn=true), epoch=42,
> > > > > ongoingEpoch=41
> > > > >      2. (instance1) dies before completeTxn(41) can be called.
> > > > >    Instance2 is created for recovery.
> > > > >      3. (instance2) InitProducerId(keepPreparedTxn=true), epoch=43,
> > > > > ongoingEpoch=42
> > > > >      4. (instance2) completeTxn(41) => abort
> > > > >    The first problem is that 41 now is aborted when it should be
> > > > committed.
> > > > > The second one is that it's not clear who could abort epoch 42,
> which
> > > is
> > > > > still open.
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan
> > > > <jols...@confluent.io.invalid
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hey Artem,
> > > > > >
> > > > > > Thanks for the updates. I think what you say makes sense. I just
> > > > updated
> > > > > my
> > > > > > KIP so I want to reconcile some of the changes we made especially
> > > with
> > > > > > respect to the TransactionLogValue.
> > > > > >
> > > > > > Firstly, I believe tagged fields require a default value so that
> if
> > > > they
> > > > > > are not filled, we return the default (and know that they were
> > > empty).
> > > > > For
> > > > > > my KIP, I proposed the default for producer ID tagged fields
> should
> > > be
> > > > > -1.
> > > > > > I was wondering if we could update the KIP to include the default
> > > > values
> > > > > > for producer ID and epoch.
> > > > > >
> > > > > > Next, I noticed we decided to rename the fields. I guess that the
> > > field
> > > > > > "NextProducerId" in my KIP correlates to "ProducerId" in this
> KIP.
> > Is
> > > > > that
> > > > > > correct? So we would have "TransactionProducerId" for the
> > non-tagged
> > > > > field
> > > > > > and have "ProducerId" (NextProducerId) and "PrevProducerId" as
> > tagged
> > > > > > fields the final version after KIP-890 and KIP-936 are
> implemented.
> > > Is
> > > > > this
> > > > > > correct? I think the tags will need updating, but that is
> trivial.
> > > > > >
> > > > > > The final question I had was with respect to storing the new
> epoch.
> > > In
> > > > > > KIP-890 part 2 (epoch bumps) I think we concluded that we don't
> > need
> > > to
> > > > > > store the epoch since we can interpret the previous epoch based
> on
> > > the
> > > > > > producer ID. But here we could call the InitProducerId multiple
> > times
> > > > and
> > > > > > we only want the producer with the correct epoch to be able to
> > commit
> > > > the
> > > > > > transaction. Is that the correct reasoning for why we need epoch
> > here
> > > > but
> > > > > > not the Prepare/Commit state.
> > > > > >
> > > > > > Thanks,
> > > > > > Justine
> > > > > >
> > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Justine,
> > > > > > >
> > > > > > > After thinking a bit about supporting atomic dual writes for
> > Kafka
> > > +
> > > > > > NoSQL
> > > > > > > database, I came to a conclusion that we do need to bump the
> > epoch
> > > > even
> > > > > > > with InitProducerId(keepPreparedTxn=true).  As I described in
> my
> > > > > previous
> > > > > > > email, we wouldn't need to bump the epoch to protect from
> zombies
> > > so
> > > > > that
> > > > > > > reasoning is still true.  But we cannot protect from
> split-brain
> > > > > > scenarios
> > > > > > > when two or more instances of a producer with the same
> > > transactional
> > > > id
> > > > > > try
> > > > > > > to produce at the same time.  The dual-write example for SQL
> > > > databases
> > > > > (
> > > > > > > https://github.com/apache/kafka/pull/14231/files) doesn't
> have a
> > > > > > > split-brain problem because execution is protected by the
> update
> > > lock
> > > > > on
> > > > > > > the transaction state record; however NoSQL databases may not
> > have
> > > > this
> > > > > > > protection (I'll write an example for NoSQL database dual-write
> > > > soon).
> > > > > > >
> > > > > > > In a nutshell, here is an example of a split-brain scenario:
> > > > > > >
> > > > > > >    1. (instance1) InitProducerId(keepPreparedTxn=true), got
> > > epoch=42
> > > > > > >    2. (instance2) InitProducerId(keepPreparedTxn=true), got
> > > epoch=42
> > > > > > >    3. (instance1) CommitTxn, epoch bumped to 43
> > > > > > >    4. (instance2) CommitTxn, this is considered a retry, so it
> > got
> > > > > epoch
> > > > > > 43
> > > > > > >    as well
> > > > > > >    5. (instance1) Produce messageA w/sequence 1
> > > > > > >    6. (instance2) Produce messageB w/sequence 1, this is
> > > considered a
> > > > > > >    duplicate
> > > > > > >    7. (instance2) Produce messageC w/sequence 2
> > > > > > >    8. (instance1) Produce messageD w/sequence 2, this is
> > > considered a
> > > > > > >    duplicate
> > > > > > >
> > > > > > > Now if either of those commit the transaction, it would have a
> > mix
> > > of
> > > > > > > messages from the two instances (messageA and messageC).  With
> > the
> > > > > proper
> > > > > > > epoch bump, instance1 would get fenced at step 3.
> > > > > > >
> > > > > > > In order to update epoch in
> InitProducerId(keepPreparedTxn=true)
> > we
> > > > > need
> > > > > > to
> > > > > > > preserve the ongoing transaction's epoch (and producerId, if
> the
> > > > epoch
> > > > > > > overflows), because we'd need to make a correct decision when
> we
> > > > > compare
> > > > > > > the PreparedTxnState that we read from the database with the
> > > > > (producerId,
> > > > > > > epoch) of the ongoing transaction.
> > > > > > >
> > > > > > > I've updated the KIP with the following:
> > > > > > >
> > > > > > >    - Ongoing transaction now has 2 (producerId, epoch) pairs --
> > one
> > > > > pair
> > > > > > >    describes the ongoing transaction, the other pair describes
> > > > expected
> > > > > > > epoch
> > > > > > >    for operations on this transactional id
> > > > > > >    - InitProducerIdResponse now returns 2 (producerId, epoch)
> > pairs
> > > > > > >    - TransactionalLogValue now has 2 (producerId, epoch) pairs,
> > the
> > > > new
> > > > > > >    values added as tagged fields, so it's easy to downgrade
> > > > > > >    - Added a note about downgrade in the Compatibility section
> > > > > > >    - Added a rejected alternative
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <
> > > > alivsh...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Justine,
> > > > > > > >
> > > > > > > > Thank you for the questions.  Currently (pre-KIP-939) we
> always
> > > > bump
> > > > > > the
> > > > > > > > epoch on InitProducerId and abort an ongoing transaction (if
> > > > any).  I
> > > > > > > > expect this behavior will continue with KIP-890 as well.
> > > > > > > >
> > > > > > > > With KIP-939 we need to support the case when the ongoing
> > > > transaction
> > > > > > > > needs to be preserved when keepPreparedTxn=true.  Bumping
> epoch
> > > > > without
> > > > > > > > aborting or committing a transaction is tricky because epoch
> > is a
> > > > > short
> > > > > > > > value and it's easy to overflow.  Currently, the overflow
> case
> > is
> > > > > > handled
> > > > > > > > by aborting the ongoing transaction, which would send out
> > > > transaction
> > > > > > > > markers with epoch=Short.MAX_VALUE to the partition leaders,
> > > which
> > > > > > would
> > > > > > > > fence off any messages with the producer id that started the
> > > > > > transaction
> > > > > > > > (they would have epoch that is less than Short.MAX_VALUE).
> > Then
> > > it
> > > > > is
> > > > > > > safe
> > > > > > > > to allocate a new producer id and use it in new transactions.
> > > > > > > >
> > > > > > > > We could say that maybe when keepPreparedTxn=true we bump
> epoch
> > > > > unless
> > > > > > it
> > > > > > > > leads to overflow, and don't bump epoch in the overflow case.
> > I
> > > > > don't
> > > > > > > > think it's a good solution because if it's not safe to keep
> the
> > > > same
> > > > > > > epoch
> > > > > > > > when keepPreparedTxn=true, then we must handle the epoch
> > overflow
> > > > > case
> > > > > > as
> > > > > > > > well.  So either we should convince ourselves that it's safe
> to
> > > > keep
> > > > > > the
> > > > > > > > epoch and do it in the general case, or we always bump the
> > epoch
> > > > and
> > > > > > > handle
> > > > > > > > the overflow.
> > > > > > > >
> > > > > > > > With KIP-890, we bump the epoch on every transaction commit /
> > > > abort.
> > > > > > > This
> > > > > > > > guarantees that even if InitProducerId(keepPreparedTxn=true)
> > > > doesn't
> > > > > > > > increment epoch on the ongoing transaction, the client will
> > have
> > > to
> > > > > > call
> > > > > > > > commit or abort to finish the transaction and will increment
> > the
> > > > > epoch
> > > > > > > (and
> > > > > > > > handle epoch overflow, if needed).  If the ongoing
> transaction
> > > was
> > > > > in a
> > > > > > > bad
> > > > > > > > state and had some zombies waiting to arrive, the abort
> > operation
> > > > > would
> > > > > > > > fence them because with KIP-890 every abort would bump the
> > epoch.
> > > > > > > >
> > > > > > > > We could also look at this from the following perspective.
> > With
> > > > > > KIP-890,
> > > > > > > > zombies won't be able to cross transaction boundaries; each
> > > > > transaction
> > > > > > > > completion creates a boundary and any activity in the past
> gets
> > > > > > confined
> > > > > > > in
> > > > > > > > the boundary.  Then data in any partition would look like
> this:
> > > > > > > >
> > > > > > > > 1. message1, epoch=42
> > > > > > > > 2. message2, epoch=42
> > > > > > > > 3. message3, epoch=42
> > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > >
> > > > > > > > Now if we inject steps 3a and 3b like this:
> > > > > > > >
> > > > > > > > 1. message1, epoch=42
> > > > > > > > 2. message2, epoch=42
> > > > > > > > 3. message3, epoch=42
> > > > > > > > 3a. crash
> > > > > > > > 3b. InitProducerId(keepPreparedTxn=true)
> > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > >
> > > > > > > > The invariant still holds even with steps 3a and 3b --
> whatever
> > > > > > activity
> > > > > > > > was in the past will get confined in the past with mandatory
> > > abort
> > > > /
> > > > > > > commit
> > > > > > > > that must follow  InitProducerId(keepPreparedTxn=true).
> > > > > > > >
> > > > > > > > So KIP-890 provides the proper isolation between
> transactions,
> > so
> > > > > > > > injecting crash + InitProducerId(keepPreparedTxn=true) into
> the
> > > > > > > > transaction sequence is safe from the zombie protection
> > > > perspective.
> > > > > > > >
> > > > > > > > That said, I'm still thinking about it and looking for cases
> > that
> > > > > might
> > > > > > > > break because we don't bump epoch when
> > > > > > > > InitProducerId(keepPreparedTxn=true), if such cases exist,
> > we'll
> > > > need
> > > > > > to
> > > > > > > > develop the logic to handle epoch overflow for ongoing
> > > > transactions.
> > > > > > > >
> > > > > > > > -Artem
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan
> > > > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > >> Hey Artem,
> > > > > > > >>
> > > > > > > >> Thanks for the KIP. I had a question about epoch bumping.
> > > > > > > >>
> > > > > > > >> Previously when we send an InitProducerId request on
> Producer
> > > > > startup,
> > > > > > > we
> > > > > > > >> bump the epoch and abort the transaction. Is it correct to
> > > assume
> > > > > that
> > > > > > > we
> > > > > > > >> will still bump the epoch, but just not abort the
> transaction?
> > > > > > > >> If we still bump the epoch in this case, how does this
> > interact
> > > > with
> > > > > > > >> KIP-890 where we also bump the epoch on every transaction.
> (I
> > > > think
> > > > > > this
> > > > > > > >> means that we may skip epochs and the data itself will all
> > have
> > > > the
> > > > > > same
> > > > > > > >> epoch)
> > > > > > > >>
> > > > > > > >> I may have follow ups depending on the answer to this. :)
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >> Justine
> > > > > > > >>
> > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
> > > > > > > >> <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >>
> > > > > > > >> > Hi Alex,
> > > > > > > >> >
> > > > > > > >> > Thank you for your questions.
> > > > > > > >> >
> > > > > > > >> > > the purpose of having broker-level
> > > > > > > transaction.two.phase.commit.enable
> > > > > > > >> >
> > > > > > > >> > The thinking is that 2PC is a bit of an advanced construct
> > so
> > > > > > enabling
> > > > > > > >> 2PC
> > > > > > > >> > in a Kafka cluster should be an explicit decision.  If it
> is
> > > set
> > > > > to
> > > > > > > >> 'false'
> > > > > > > >> > InitiProducerId (and initTransactions) would
> > > > > > > >> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> > > > > > > >> >
> > > > > > > >> > > WDYT about adding an AdminClient method that returns the
> > > state
> > > > > of
> > > > > > > >> > transaction.two.phase.commit.enable
> > > > > > > >> >
> > > > > > > >> > I wonder if the client could just try to use 2PC and then
> > > handle
> > > > > the
> > > > > > > >> error
> > > > > > > >> > (e.g. if it needs to fall back to ordinary transactions).
> > > This
> > > > > way
> > > > > > it
> > > > > > > >> > could uniformly handle cases when Kafka cluster doesn't
> > > support
> > > > > 2PC
> > > > > > > >> > completely and cases when 2PC is restricted to certain
> > users.
> > > > We
> > > > > > > could
> > > > > > > >> > also expose this config in describeConfigs, if the
> fallback
> > > > > approach
> > > > > > > >> > doesn't work for some scenarios.
> > > > > > > >> >
> > > > > > > >> > -Artem
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
> > > > > > > >> > <asorokou...@confluent.io.invalid> wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi Artem,
> > > > > > > >> > >
> > > > > > > >> > > Thanks for publishing this KIP!
> > > > > > > >> > >
> > > > > > > >> > > Can you please clarify the purpose of having
> broker-level
> > > > > > > >> > > transaction.two.phase.commit.enable config in addition
> to
> > > the
> > > > > new
> > > > > > > >> ACL? If
> > > > > > > >> > > the brokers are configured with
> > > > > > > >> > transaction.two.phase.commit.enable=false,
> > > > > > > >> > > at what point will a client configured with
> > > > > > > >> > > transaction.two.phase.commit.enable=true fail? Will it
> > > happen
> > > > at
> > > > > > > >> > > KafkaProducer#initTransactions?
> > > > > > > >> > >
> > > > > > > >> > > WDYT about adding an AdminClient method that returns the
> > > state
> > > > > of
> > > > > > t
> > > > > > > >> > > ransaction.two.phase.commit.enable? This way, clients
> > would
> > > > know
> > > > > > in
> > > > > > > >> > advance
> > > > > > > >> > > if 2PC is enabled on the brokers.
> > > > > > > >> > >
> > > > > > > >> > > Best,
> > > > > > > >> > > Alex
> > > > > > > >> > >
> > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <
> > > > > > > roger.hoo...@gmail.com>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Other than supporting multiplexing transactional
> streams
> > > on
> > > > a
> > > > > > > single
> > > > > > > >> > > > producer, I don't see how to improve it.
> > > > > > > >> > > >
> > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
> > > > > > > >> > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Hi Roger,
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thank you for summarizing the cons.  I agree and I'm
> > > > curious
> > > > > > > what
> > > > > > > >> > would
> > > > > > > >> > > > be
> > > > > > > >> > > > > the alternatives to solve these problems better and
> if
> > > > they
> > > > > > can
> > > > > > > be
> > > > > > > >> > > > > incorporated into this proposal (or built
> > independently
> > > in
> > > > > > > >> addition
> > > > > > > >> > to
> > > > > > > >> > > or
> > > > > > > >> > > > > on top of this proposal).  E.g. one potential
> > extension
> > > we
> > > > > > > >> discussed
> > > > > > > >> > > > > earlier in the thread could be multiplexing logical
> > > > > > > transactional
> > > > > > > >> > > > "streams"
> > > > > > > >> > > > > with a single producer.
> > > > > > > >> > > > >
> > > > > > > >> > > > > -Artem
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <
> > > > > > > >> roger.hoo...@gmail.com
> > > > > > > >> > >
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > Thanks.  I like that you're moving Kafka toward
> > > > supporting
> > > > > > > this
> > > > > > > >> > > > > dual-write
> > > > > > > >> > > > > > pattern.  Each use case needs to consider the
> > > tradeoffs.
> > > > > > You
> > > > > > > >> > already
> > > > > > > >> > > > > > summarized the pros very well in the KIP.  I would
> > > > > summarize
> > > > > > > the
> > > > > > > >> > cons
> > > > > > > >> > > > > > as follows:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > - you sacrifice availability - each write requires
> > > both
> > > > DB
> > > > > > and
> > > > > > > >> > Kafka
> > > > > > > >> > > to
> > > > > > > >> > > > > be
> > > > > > > >> > > > > > available so I think your overall application
> > > > availability
> > > > > > is
> > > > > > > 1
> > > > > > > >> -
> > > > > > > >> > > p(DB
> > > > > > > >> > > > is
> > > > > > > >> > > > > > unavailable)*p(Kafka is unavailable).
> > > > > > > >> > > > > > - latency will be higher and throughput lower -
> each
> > > > write
> > > > > > > >> requires
> > > > > > > >> > > > both
> > > > > > > >> > > > > > writes to DB and Kafka while holding an exclusive
> > lock
> > > > in
> > > > > > DB.
> > > > > > > >> > > > > > - you need to create a producer per unit of
> > > concurrency
> > > > in
> > > > > > > your
> > > > > > > >> app
> > > > > > > >> > > > which
> > > > > > > >> > > > > > has some overhead in the app and Kafka side
> (number
> > of
> > > > > > > >> connections,
> > > > > > > >> > > > poor
> > > > > > > >> > > > > > batching).  I assume the producers would need to
> be
> > > > > > configured
> > > > > > > >> for
> > > > > > > >> > > low
> > > > > > > >> > > > > > latency (linger.ms=0)
> > > > > > > >> > > > > > - there's some complexity in managing stable
> > > > transactional
> > > > > > ids
> > > > > > > >> for
> > > > > > > >> > > each
> > > > > > > >> > > > > > producer/concurrency unit in your application.
> With
> > > k8s
> > > > > > > >> > deployment,
> > > > > > > >> > > > you
> > > > > > > >> > > > > > may need to switch to something like a StatefulSet
> > > that
> > > > > > gives
> > > > > > > >> each
> > > > > > > >> > > pod
> > > > > > > >> > > > a
> > > > > > > >> > > > > > stable identity across restarts.  On top of that
> pod
> > > > > > identity
> > > > > > > >> which
> > > > > > > >> > > you
> > > > > > > >> > > > > can
> > > > > > > >> > > > > > use as a prefix, you then assign unique
> > transactional
> > > > ids
> > > > > to
> > > > > > > >> each
> > > > > > > >> > > > > > concurrency unit (thread/goroutine).
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> > > > > > > >> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > > Hi Roger,
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Thank you for the feedback.  You make a very
> good
> > > > point
> > > > > > that
> > > > > > > >> we
> > > > > > > >> > > also
> > > > > > > >> > > > > > > discussed internally.  Adding support for
> multiple
> > > > > > > concurrent
> > > > > > > >> > > > > > > transactions in one producer could be valuable
> but
> > > it
> > > > > > seems
> > > > > > > to
> > > > > > > >> > be a
> > > > > > > >> > > > > > fairly
> > > > > > > >> > > > > > > large and independent change that would deserve
> a
> > > > > separate
> > > > > > > >> KIP.
> > > > > > > >> > If
> > > > > > > >> > > > > such
> > > > > > > >> > > > > > > support is added we could modify 2PC
> functionality
> > > to
> > > > > > > >> incorporate
> > > > > > > >> > > > that.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > Maybe not too bad but a bit of pain to manage
> > > these
> > > > > ids
> > > > > > > >> inside
> > > > > > > >> > > each
> > > > > > > >> > > > > > > process and across all application processes.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > I'm not sure if supporting multiple transactions
> > in
> > > > one
> > > > > > > >> producer
> > > > > > > >> > > > would
> > > > > > > >> > > > > > make
> > > > > > > >> > > > > > > id management simpler: we'd need to store a
> piece
> > of
> > > > > data
> > > > > > > per
> > > > > > > >> > > > > > transaction,
> > > > > > > >> > > > > > > so whether it's N producers with a single
> > > transaction
> > > > > or N
> > > > > > > >> > > > transactions
> > > > > > > >> > > > > > > with a single producer, it's still roughly the
> > same
> > > > > amount
> > > > > > > of
> > > > > > > >> > data
> > > > > > > >> > > to
> > > > > > > >> > > > > > > manage.  In fact, managing transactional ids
> > > (current
> > > > > > > >> proposal)
> > > > > > > >> > > might
> > > > > > > >> > > > > be
> > > > > > > >> > > > > > > easier, because the id is controlled by the
> > > > application
> > > > > > and
> > > > > > > it
> > > > > > > >> > > knows
> > > > > > > >> > > > > how
> > > > > > > >> > > > > > to
> > > > > > > >> > > > > > > complete the transaction after crash / restart;
> > > while
> > > > a
> > > > > > TID
> > > > > > > >> would
> > > > > > > >> > > be
> > > > > > > >> > > > > > > generated by Kafka and that would create a
> > question
> > > of
> > > > > > > >> starting
> > > > > > > >> > > Kafka
> > > > > > > >> > > > > > > transaction, but not saving its TID and then
> > > crashing,
> > > > > > then
> > > > > > > >> > > figuring
> > > > > > > >> > > > > out
> > > > > > > >> > > > > > > which transactions to abort and etc.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > 2) creating a separate producer for each
> > > concurrency
> > > > > > slot
> > > > > > > in
> > > > > > > >> > the
> > > > > > > >> > > > > > > application
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > This is a very valid concern.  Maybe we'd need
> to
> > > have
> > > > > > some
> > > > > > > >> > > > > multiplexing
> > > > > > > >> > > > > > of
> > > > > > > >> > > > > > > transactional logical "streams" over the same
> > > > > connection.
> > > > > > > >> Seems
> > > > > > > >> > > > like a
> > > > > > > >> > > > > > > separate KIP, though.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > Otherwise, it seems you're left with
> > > single-threaded
> > > > > > model
> > > > > > > >> per
> > > > > > > >> > > > > > > application process?
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > That's a fair assessment.  Not necessarily
> exactly
> > > > > > > >> > single-threaded
> > > > > > > >> > > > per
> > > > > > > >> > > > > > > application, but a single producer per thread
> > model
> > > > > (i.e.
> > > > > > an
> > > > > > > >> > > > > application
> > > > > > > >> > > > > > > could have a pool of threads + producers to
> > increase
> > > > > > > >> > concurrency).
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > -Artem
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <
> > > > > > > >> > > roger.hoo...@gmail.com
> > > > > > > >> > > > >
> > > > > > > >> > > > > > > wrote:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > Artem,
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Thanks for the reply.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > If I understand correctly, Kafka does not
> > support
> > > > > > > concurrent
> > > > > > > >> > > > > > transactions
> > > > > > > >> > > > > > > > from the same producer (transactional id).  I
> > > think
> > > > > this
> > > > > > > >> means
> > > > > > > >> > > that
> > > > > > > >> > > > > > > > applications that want to support in-process
> > > > > concurrency
> > > > > > > >> (say
> > > > > > > >> > > > > > > thread-level
> > > > > > > >> > > > > > > > concurrency with row-level DB locking) would
> > need
> > > to
> > > > > > > manage
> > > > > > > >> > > > separate
> > > > > > > >> > > > > > > > transactional ids and producers per thread and
> > > then
> > > > > > store
> > > > > > > >> txn
> > > > > > > >> > > state
> > > > > > > >> > > > > > > > accordingly.   The potential usability
> > downsides I
> > > > see
> > > > > > are
> > > > > > > >> > > > > > > > 1) managing a set of transactional ids for
> each
> > > > > > > application
> > > > > > > >> > > process
> > > > > > > >> > > > > > that
> > > > > > > >> > > > > > > > scales up to it's max concurrency.  Maybe not
> > too
> > > > bad
> > > > > > but
> > > > > > > a
> > > > > > > >> bit
> > > > > > > >> > > of
> > > > > > > >> > > > > pain
> > > > > > > >> > > > > > > to
> > > > > > > >> > > > > > > > manage these ids inside each process and
> across
> > > all
> > > > > > > >> application
> > > > > > > >> > > > > > > processes.
> > > > > > > >> > > > > > > > 2) creating a separate producer for each
> > > concurrency
> > > > > > slot
> > > > > > > in
> > > > > > > >> > the
> > > > > > > >> > > > > > > > application - this could create a lot more
> > > producers
> > > > > and
> > > > > > > >> > > resultant
> > > > > > > >> > > > > > > > connections to Kafka than the typical model
> of a
> > > > > single
> > > > > > > >> > producer
> > > > > > > >> > > > per
> > > > > > > >> > > > > > > > process.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Otherwise, it seems you're left with
> > > single-threaded
> > > > > > model
> > > > > > > >> per
> > > > > > > >> > > > > > > application
> > > > > > > >> > > > > > > > process?
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Thanks,
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Roger
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
> > > > > > > >> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > > Hi Roger, Arjun,
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > Thank you for the questions.
> > > > > > > >> > > > > > > > > > It looks like the application must have
> > stable
> > > > > > > >> > transactional
> > > > > > > >> > > > ids
> > > > > > > >> > > > > > over
> > > > > > > >> > > > > > > > > time?
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > The transactional id should uniquely
> identify
> > a
> > > > > > producer
> > > > > > > >> > > instance
> > > > > > > >> > > > > and
> > > > > > > >> > > > > > > > needs
> > > > > > > >> > > > > > > > > to be stable across the restarts.  If the
> > > > > > transactional
> > > > > > > >> id is
> > > > > > > >> > > not
> > > > > > > >> > > > > > > stable
> > > > > > > >> > > > > > > > > across restarts, then zombie messages from a
> > > > > previous
> > > > > > > >> > > incarnation
> > > > > > > >> > > > > of
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > > > producer may violate atomicity.  If there
> are
> > 2
> > > > > > producer
> > > > > > > >> > > > instances
> > > > > > > >> > > > > > > > > concurrently producing data with the same
> > > > > > transactional
> > > > > > > >> id,
> > > > > > > >> > > they
> > > > > > > >> > > > > are
> > > > > > > >> > > > > > > > going
> > > > > > > >> > > > > > > > > to constantly fence each other and most
> likely
> > > > make
> > > > > > > >> little or
> > > > > > > >> > > no
> > > > > > > >> > > > > > > > progress.
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > The name might be a little bit confusing as
> it
> > > may
> > > > > be
> > > > > > > >> > mistaken
> > > > > > > >> > > > for
> > > > > > > >> > > > > a
> > > > > > > >> > > > > > > > > transaction id / TID that uniquely
> identifies
> > > > every
> > > > > > > >> > > transaction.
> > > > > > > >> > > > > The
> > > > > > > >> > > > > > > > name
> > > > > > > >> > > > > > > > > and the semantics were defined in the
> original
> > > > > > > >> > > > > exactly-once-semantics
> > > > > > > >> > > > > > > > (EoS)
> > > > > > > >> > > > > > > > > proposal (
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > >> > > > > > > > > )
> > > > > > > >> > > > > > > > > and KIP-939 just build on top of that.
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > > I'm curious to understand what happens if
> > the
> > > > > > producer
> > > > > > > >> > dies,
> > > > > > > >> > > > and
> > > > > > > >> > > > > > does
> > > > > > > >> > > > > > > > not
> > > > > > > >> > > > > > > > > come up and recover the pending transaction
> > > within
> > > > > the
> > > > > > > >> > > > transaction
> > > > > > > >> > > > > > > > timeout
> > > > > > > >> > > > > > > > > interval.
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > If the producer / application never comes
> > back,
> > > > the
> > > > > > > >> > transaction
> > > > > > > >> > > > > will
> > > > > > > >> > > > > > > > remain
> > > > > > > >> > > > > > > > > in prepared (a.k.a. "in-doubt") state until
> an
> > > > > > operator
> > > > > > > >> > > > forcefully
> > > > > > > >> > > > > > > > > terminates the transaction.  That's why
> there
> > > is a
> > > > > new
> > > > > > > >> ACL is
> > > > > > > >> > > > > defined
> > > > > > > >> > > > > > > in
> > > > > > > >> > > > > > > > > this proposal -- this functionality should
> > only
> > > > > > provided
> > > > > > > >> to
> > > > > > > >> > > > > > > applications
> > > > > > > >> > > > > > > > > that implement proper recovery logic.
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > -Artem
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun
> Satish
> > <
> > > > > > > >> > > > > > arjun.sat...@gmail.com>
> > > > > > > >> > > > > > > > > wrote:
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > > Hello Artem,
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > Thanks for the KIP.
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > I have the same question as Roger on
> > > concurrent
> > > > > > > writes,
> > > > > > > >> and
> > > > > > > >> > > an
> > > > > > > >> > > > > > > > additional
> > > > > > > >> > > > > > > > > > one on consumer behavior. Typically,
> > > > transactions
> > > > > > will
> > > > > > > >> > > timeout
> > > > > > > >> > > > if
> > > > > > > >> > > > > > not
> > > > > > > >> > > > > > > > > > committed within some time interval. With
> > the
> > > > > > proposed
> > > > > > > >> > > changes
> > > > > > > >> > > > in
> > > > > > > >> > > > > > > this
> > > > > > > >> > > > > > > > > KIP,
> > > > > > > >> > > > > > > > > > consumers cannot consume past the ongoing
> > > > > > transaction.
> > > > > > > >> I'm
> > > > > > > >> > > > > curious
> > > > > > > >> > > > > > to
> > > > > > > >> > > > > > > > > > understand what happens if the producer
> > dies,
> > > > and
> > > > > > does
> > > > > > > >> not
> > > > > > > >> > > come
> > > > > > > >> > > > > up
> > > > > > > >> > > > > > > and
> > > > > > > >> > > > > > > > > > recover the pending transaction within the
> > > > > > transaction
> > > > > > > >> > > timeout
> > > > > > > >> > > > > > > > interval.
> > > > > > > >> > > > > > > > > Or
> > > > > > > >> > > > > > > > > > are we saying that when used in this 2PC
> > > > context,
> > > > > we
> > > > > > > >> should
> > > > > > > >> > > > > > configure
> > > > > > > >> > > > > > > > > these
> > > > > > > >> > > > > > > > > > transaction timeouts to very large
> > durations?
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > Thanks in advance!
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > Best,
> > > > > > > >> > > > > > > > > > Arjun
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger
> > Hoover <
> > > > > > > >> > > > > > roger.hoo...@gmail.com
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > > Hi Artem,
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > Thanks for writing this KIP.  Can you
> > > clarify
> > > > > the
> > > > > > > >> > > > requirements
> > > > > > > >> > > > > a
> > > > > > > >> > > > > > > bit
> > > > > > > >> > > > > > > > > more
> > > > > > > >> > > > > > > > > > > for managing transaction state?  It
> looks
> > > like
> > > > > the
> > > > > > > >> > > > application
> > > > > > > >> > > > > > must
> > > > > > > >> > > > > > > > > have
> > > > > > > >> > > > > > > > > > > stable transactional ids over time?
>  What
> > > is
> > > > > the
> > > > > > > >> > > granularity
> > > > > > > >> > > > > of
> > > > > > > >> > > > > > > > those
> > > > > > > >> > > > > > > > > > ids
> > > > > > > >> > > > > > > > > > > and producers?  Say the application is a
> > > > > > > >> multi-threaded
> > > > > > > >> > > Java
> > > > > > > >> > > > > web
> > > > > > > >> > > > > > > > > server,
> > > > > > > >> > > > > > > > > > > can/should all the concurrent threads
> > share
> > > a
> > > > > > > >> > transactional
> > > > > > > >> > > > id
> > > > > > > >> > > > > > and
> > > > > > > >> > > > > > > > > > > producer?  That doesn't seem right to me
> > > > unless
> > > > > > the
> > > > > > > >> > > > application
> > > > > > > >> > > > > > is
> > > > > > > >> > > > > > > > > using
> > > > > > > >> > > > > > > > > > > global DB locks that serialize all
> > requests.
> > > > > > > >> Instead, if
> > > > > > > >> > > the
> > > > > > > >> > > > > > > > > application
> > > > > > > >> > > > > > > > > > > uses row-level DB locks, there could be
> > > > > multiple,
> > > > > > > >> > > concurrent,
> > > > > > > >> > > > > > > > > independent
> > > > > > > >> > > > > > > > > > > txns happening in the same JVM so it
> seems
> > > > like
> > > > > > the
> > > > > > > >> > > > granularity
> > > > > > > >> > > > > > > > > managing
> > > > > > > >> > > > > > > > > > > transactional ids and txn state needs to
> > > line
> > > > up
> > > > > > > with
> > > > > > > >> > > > > granularity
> > > > > > > >> > > > > > > of
> > > > > > > >> > > > > > > > > the
> > > > > > > >> > > > > > > > > > DB
> > > > > > > >> > > > > > > > > > > locking.
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > Does that make sense or am I
> > > misunderstanding?
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > Roger
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem
> > > > Livshits
> > > > > > > >> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > Hello,
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > This is a discussion thread for
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > >> > > > > > > > > > > > .
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > The KIP proposes extending Kafka
> > > transaction
> > > > > > > support
> > > > > > > >> > > (that
> > > > > > > >> > > > > > > already
> > > > > > > >> > > > > > > > > uses
> > > > > > > >> > > > > > > > > > > 2PC
> > > > > > > >> > > > > > > > > > > > under the hood) to enable atomicity of
> > > dual
> > > > > > writes
> > > > > > > >> to
> > > > > > > >> > > Kafka
> > > > > > > >> > > > > and
> > > > > > > >> > > > > > > an
> > > > > > > >> > > > > > > > > > > external
> > > > > > > >> > > > > > > > > > > > database, and helps to fix a long
> > standing
> > > > > Flink
> > > > > > > >> issue.
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > An example of code that uses the dual
> > > write
> > > > > > recipe
> > > > > > > >> with
> > > > > > > >> > > > JDBC
> > > > > > > >> > > > > > and
> > > > > > > >> > > > > > > > > should
> > > > > > > >> > > > > > > > > > > > work for most SQL databases is here
> > > > > > > >> > > > > > > > > > > >
> > > https://github.com/apache/kafka/pull/14231.
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > The FLIP for the sister fix in Flink
> is
> > > here
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > -Artem
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to