Hi Jun,

> 20. For Flink usage, it seems that the APIs used to abort and commit a
prepared txn are not symmetric.

For Flink it is expected that Flink would call .commitTransaction or
.abortTransaction directly, it wouldn't need to deal with PreparedTxnState,
the outcome is actually determined by the Flink's job manager, not by
comparison of PreparedTxnState.  So for Flink, if the Kafka sync crashes
and restarts there are 2 cases:

1. Transaction is not prepared.  In that case just call
producer.initTransactions(false) and then can start transactions as needed.
2. Transaction is prepared.  In that case call
producer.initTransactions(true) and wait for the decision from the job
manager.  Note that it's not given that the transaction will get committed,
the decision could also be an abort.

 > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we
could use a negative timeout in the record to indicate 2PC?

-1 sounds good, updated.

> 30. The KIP has two different APIs to abort an ongoing txn. Do we need
both?

I think of producer.initTransactions() to be an implementation for
adminClient.forceTerminateTransaction(transactionalId).

> 31. "This would flush all the pending messages and transition the producer

Updated the KIP to clarify that IllegalStateException will be thrown.

-Artem


On Mon, Feb 5, 2024 at 2:22 PM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20. For Flink usage, it seems that the APIs used to abort and commit a
> prepared txn are not symmetric.
> To abort, the app will just call
>   producer.initTransactions(false)
>
> To commit, the app needs to call
>   producer.initTransactions(true)
>   producer.completeTransaction(preparedTxnState)
>
> Will this be a concern? For the dual-writer usage, both abort/commit use
> the same API.
>
> 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we
> could
> use a negative timeout in the record to indicate 2PC?
>
> 30. The KIP has two different APIs to abort an ongoing txn. Do we need
> both?
>   producer.initTransactions(false)
>   adminClient.forceTerminateTransaction(transactionalId)
>
> 31. "This would flush all the pending messages and transition the producer
> into a mode where only .commitTransaction, .abortTransaction, or
> .completeTransaction could be called.  If the call is successful (all
> messages successfully got flushed to all partitions) the transaction is
> prepared."
>  If the producer calls send() in that state, what exception will the caller
> receive?
>
> Jun
>
>
> On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > >  Then, should we change the following in the example to use
> > InitProducerId(true) instead?
> >
> > We could. I just thought that it's good to make the example
> self-contained
> > by starting from a clean state.
> >
> > > Also, could Flink just follow the dual-write recipe?
> >
> > I think it would bring some unnecessary logic to Flink (or any other
> system
> > that already has a transaction coordinator and just wants to drive Kafka
> to
> > the desired state).  We could discuss it with Flink folks, the current
> > proposal was developed in collaboration with them.
> >
> > > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> > Integer.MAX_VALUE?
> >
> > The server would reject this for regular transactions, it only accepts
> > values that are <= *transaction.max.timeout.ms
> > <http://transaction.max.timeout.ms> *(a broker config).
> >
> > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
> > request to use the ongoing pid. ...
> >
> > Without 2PC there is no case where the pid could change between starting
> a
> > transaction and endTxn (InitProducerId would abort any ongoing
> > transaction).  WIth 2PC there is now a case where there could be
> > InitProducerId that can change the pid without aborting the transaction,
> so
> > we need to handle that.  I wouldn't say that the flow is different, but
> > it's rather extended to handle new cases.  The main principle is still
> the
> > same -- for all operations we use the latest "operational" pid and epoch
> > known to the client, this way we guarantee that we can fence zombie /
> split
> > brain clients by disrupting the "latest known" pid + epoch progression.
> >
> > > 25. "We send out markers using the original ongoing transaction
> > ProducerId and ProducerEpoch" ...
> >
> > Updated.
> >
> > -Artem
> >
> > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > 20. So for the dual-write recipe, we should always call
> > > InitProducerId(keepPreparedTxn=true) from the producer? Then, should we
> > > change the following in the example to use InitProducerId(true)
> instead?
> > > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> > > NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> > > Also, could Flink just follow the dual-write recipe? It's simpler if
> > there
> > > is one way to solve the 2pc issue.
> > >
> > > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> > > Integer.MAX_VALUE?
> > >
> > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
> > > request to use the ongoing pid. With 2pc, the coordinator now expects
> the
> > > endTxn request to use the next pid. So, the flow is different, right?
> > >
> > > 25. "We send out markers using the original ongoing transaction
> > ProducerId
> > > and ProducerEpoch"
> > > We should use ProducerEpoch + 1 in the marker, right?
> > >
> > > Jun
> > >
> > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > > 20.  I am a bit confused by how we set keepPreparedTxn.  ...
> > > >
> > > > keepPreparedTxn=true informs the transaction coordinator that it
> should
> > > > keep the ongoing transaction, if any.  If the keepPreparedTxn=false,
> > then
> > > > any ongoing transaction is aborted (this is exactly the current
> > > behavior).
> > > > enable2Pc is a separate argument that is controlled by the
> > > > *transaction.two.phase.commit.enable *setting on the client.
> > > >
> > > > To start 2PC, the client just needs to set
> > > > *transaction.two.phase.commit.enable*=true in the config.  Then if
> the
> > > > client knows the status of the transaction upfront (in the case of
> > Flink,
> > > > Flink keeps the knowledge if the transaction is prepared in its own
> > > store,
> > > > so it always knows upfront), it can set keepPreparedTxn accordingly,
> > then
> > > > if the transaction was prepared, it'll be ready for the client to
> > > complete
> > > > the appropriate action; if the client doesn't have a knowledge that
> the
> > > > transaction is prepared, keepPreparedTxn is going to be false, in
> which
> > > > case we'll get to a clean state (the same way we do today).
> > > >
> > > > For the dual-write recipe, the client doesn't know upfront if the
> > > > transaction is prepared, this information is implicitly encoded
> > > > PreparedTxnState value that can be used to resolve the transaction
> > state.
> > > > In that case, keepPreparedTxn should always be true, because we don't
> > > know
> > > > upfront and we don't want to accidentally abort a committed
> > transaction.
> > > >
> > > > The forceTerminateTransaction call can just use
> keepPreparedTxn=false,
> > it
> > > > actually doesn't matter if it sets Enable2Pc flag.
> > > >
> > > > > 21. TransactionLogValue: Do we need some field to identify whether
> > this
> > > > is written for 2PC so that ongoing txn is never auto aborted?
> > > >
> > > > The TransactionTimeoutMs would be set to Integer.MAX_VALUE if 2PC was
> > > > enabled.  I've added a note to the KIP about this.
> > > >
> > > > > 22
> > > >
> > > > You're right it's a typo.  I fixed it as well as step 9 (REQUEST:
> > > > ProducerId=73, ProducerEpoch=MAX).
> > > >
> > > > > 23. It's a bit weird that Enable2Pc is driven by a config while
> > > > KeepPreparedTxn is from an API param ...
> > > >
> > > > The intent to use 2PC doesn't change from transaction to transaction,
> > but
> > > > the intent to keep prepared txn may change from transaction to
> > > > transaction.  In dual-write recipes the distinction is not clear, but
> > for
> > > > use cases where keepPreparedTxn value is known upfront (e.g. Flink)
> > it's
> > > > more prominent.  E.g. a Flink's Kafka sink operator could be deployed
> > > with
> > > > *transaction.two.phase.commit.enable*=true hardcoded in the image,
> but
> > > > keepPreparedTxn cannot be hardcoded in the image, because it depends
> on
> > > the
> > > > job manager's state.
> > > >
> > > > > 24
> > > >
> > > > The flow is actually going to be the same way as it is now -- the
> > "main"
> > > > producer id + epoch needs to be used in all operations to prevent
> > fencing
> > > > (it's sort of a common "header" in all RPC calls that follow the same
> > > > rules).  The ongoing txn info is just additional info for making a
> > > commit /
> > > > abort decision based on the PreparedTxnState from the DB.
> > > >
> > > > --Artem
> > > >
> > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the reply. A few more comments.
> > > > >
> > > > > 20. I am a bit confused by how we set keepPreparedTxn. From the
> KIP,
> > I
> > > > got
> > > > > the following (1) to start 2pc, we call
> > > > > InitProducerId(keepPreparedTxn=false); (2) when the producer fails
> > and
> > > > > needs to do recovery, it calls
> InitProducerId(keepPreparedTxn=true);
> > > (3)
> > > > > Admin.forceTerminateTransaction() calls
> > > > > InitProducerId(keepPreparedTxn=false).
> > > > > 20.1 In (1), when a producer calls InitProducerId(false) with 2pc
> > > > enabled,
> > > > > and there is an ongoing txn, should the server return an error to
> the
> > > > > InitProducerId request? If so, what would be the error code?
> > > > > 20.2 How do we distinguish between (1) and (3)? It's the same API
> > call
> > > > but
> > > > > (1) doesn't abort ongoing txn and (2) does.
> > > > > 20.3 The usage in (1) seems unintuitive. 2pc implies keeping the
> > > ongoing
> > > > > txn. So, setting keepPreparedTxn to false to start 2pc seems
> counter
> > > > > intuitive.
> > > > >
> > > > > 21. TransactionLogValue: Do we need some field to identify whether
> > this
> > > > is
> > > > > written for 2PC so that ongoing txn is never auto aborted?
> > > > >
> > > > > 22. "8. InitProducerId(true); TC STATE: Ongoing, ProducerId=42,
> > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73,
> > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1,
> > > > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
> > > > > It seems in the above example, Epoch in RESPONSE should be MAX to
> > match
> > > > > NextProducerEpoch?
> > > > >
> > > > > 23. It's a bit weird that Enable2Pc is driven by a config
> > > > > while KeepPreparedTxn is from an API param. Should we make them
> more
> > > > > consistent since they seem related?
> > > > >
> > > > > 24. "9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1; TC
> > STATE:
> > > > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73,
> > > > > NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85,
> > > Epoch=0,
> > > > > When a commit request is sent, it uses the latest ProducerId and
> > > > > ProducerEpoch."
> > > > > The step where we use the next produceId to commit an old txn
> works,
> > > but
> > > > > can be confusing. It's going to be hard for people implementing
> this
> > > new
> > > > > client protocol to figure out when to use the current or the new
> > > > producerId
> > > > > in the EndTxnRequest. One potential way to improve this is to
> extend
> > > > > EndTxnRequest with a new field like expectedNextProducerId. Then we
> > can
> > > > > always use the old produceId in the existing field, but set
> > > > > expectedNextProducerId to bypass the fencing logic when needed.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
> > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Thank you for the comments.
> > > > > >
> > > > > > > 10. For the two new fields in Enable2Pc and KeepPreparedTxn ...
> > > > > >
> > > > > > I added a note that all combinations are valid.  Enable2Pc=false
> &
> > > > > > KeepPreparedTxn=true could be potentially useful for backward
> > > > > compatibility
> > > > > > with Flink, when the new version of Flink that implements KIP-319
> > > tries
> > > > > to
> > > > > > work with a cluster that doesn't authorize 2PC.
> > > > > >
> > > > > > > 11.  InitProducerIdResponse: If there is no ongoing txn, what
> > will
> > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > >
> > > > > > I added a note that they will be set to -1.  The client then will
> > > know
> > > > > that
> > > > > > there is no ongoing txn and .completeTransaction becomes a no-op
> > (but
> > > > > still
> > > > > > required before .send is enabled).
> > > > > >
> > > > > > > 12. ListTransactionsRequest related changes: It seems those are
> > > > already
> > > > > > covered by KIP-994?
> > > > > >
> > > > > > Removed from this KIP.
> > > > > >
> > > > > > > 13. TransactionalLogValue ...
> > > > > >
> > > > > > This is now updated to work on top of KIP-890.
> > > > > >
> > > > > > > 14. "Note that the (producerId, epoch) pair that corresponds to
> > the
> > > > > > ongoing transaction ...
> > > > > >
> > > > > > This is now updated to work on top of KIP-890.
> > > > > >
> > > > > > > 15. active-transaction-total-time-max : ...
> > > > > >
> > > > > > Updated.
> > > > > >
> > > > > > > 16. "transaction.two.phase.commit.enable The default would be
> > > > ‘false’.
> > > > > > If it’s ‘false’, 2PC functionality is disabled even if the ACL is
> > set
> > > > ...
> > > > > >
> > > > > > Disabling 2PC effectively removes all authorization to use it,
> > hence
> > > I
> > > > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be
> appropriate.
> > > > > >
> > > > > > Do you suggest using a different error code for 2PC authorization
> > vs
> > > > some
> > > > > > other authorization (e.g.
> > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
> > > > or a
> > > > > > different code for disabled vs. unauthorised (e.g.
> > > > > > TWO_PHASE_COMMIT_DISABLED) or both?
> > > > > >
> > > > > > > 17. completeTransaction(). We expect this to be only used
> during
> > > > > > recovery.
> > > > > >
> > > > > > It can also be used if, say, a commit to the database fails and
> the
> > > > > result
> > > > > > is inconclusive, e.g.
> > > > > >
> > > > > > 1. Begin DB transaction
> > > > > > 2. Begin Kafka transaction
> > > > > > 3. Prepare Kafka transaction
> > > > > > 4. Commit DB transaction
> > > > > > 5. The DB commit fails, figure out the state of the transaction
> by
> > > > > reading
> > > > > > the PreparedTxnState from DB
> > > > > > 6. Complete Kafka transaction with the PreparedTxnState.
> > > > > >
> > > > > > > 18. "either prepareTransaction was called or
> > initTransaction(true)
> > > > was
> > > > > > called": "either" should be "neither"?
> > > > > >
> > > > > > Updated.
> > > > > >
> > > > > > > 19. Since InitProducerId always bumps up the epoch, it creates
> a
> > > > > > situation ...
> > > > > >
> > > > > > InitProducerId only bumps the producer epoch, the ongoing
> > transaction
> > > > > epoch
> > > > > > stays the same, no matter how many times the InitProducerId is
> > called
> > > > > > before the transaction is completed.  Eventually the epoch may
> > > > overflow,
> > > > > > and then a new producer id would be allocated, but the ongoing
> > > > > transaction
> > > > > > producer id would stay the same.
> > > > > >
> > > > > > I've added a couple examples in the KIP (
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
> > > > > > )
> > > > > > that walk through some scenarios and show how the state is
> changed.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao <j...@confluent.io.invalid
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi, Artem,
> > > > > > >
> > > > > > > Thanks for the KIP. A few comments below.
> > > > > > >
> > > > > > > 10. For the two new fields in Enable2Pc and KeepPreparedTxn in
> > > > > > > InitProducerId, it would be useful to document a bit more
> detail
> > on
> > > > > what
> > > > > > > values are set under what cases. For example, are all four
> > > > combinations
> > > > > > > valid?
> > > > > > >
> > > > > > > 11.  InitProducerIdResponse: If there is no ongoing txn, what
> > will
> > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > > >
> > > > > > > 12. ListTransactionsRequest related changes: It seems those are
> > > > already
> > > > > > > covered by KIP-994?
> > > > > > >
> > > > > > > 13. TransactionalLogValue: Could we name TransactionProducerId
> > and
> > > > > > > ProducerId better? It's not clear from the name which is for
> > which.
> > > > > > >
> > > > > > > 14. "Note that the (producerId, epoch) pair that corresponds to
> > the
> > > > > > ongoing
> > > > > > > transaction is going to be written instead of the existing
> > > ProducerId
> > > > > and
> > > > > > > ProducerEpoch fields (which are renamed to reflect the
> semantics)
> > > to
> > > > > > > support downgrade.": I am a bit confused on that. Are we
> writing
> > > > > > different
> > > > > > > values to the existing fields? Then, we can't downgrade, right?
> > > > > > >
> > > > > > > 15. active-transaction-total-time-max : Would
> > > > > > > active-transaction-open-time-max be more intuitive? Also, could
> > we
> > > > > > include
> > > > > > > the full name (group, tags, etc)?
> > > > > > >
> > > > > > > 16. "transaction.two.phase.commit.enable The default would be
> > > > ‘false’.
> > > > > > If
> > > > > > > it’s ‘false’, 2PC functionality is disabled even if the ACL is
> > set,
> > > > > > clients
> > > > > > > that attempt to use this functionality would receive
> > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive for the
> > > > client
> > > > > to
> > > > > > > understand what the actual cause is.
> > > > > > >
> > > > > > > 17. completeTransaction(). We expect this to be only used
> during
> > > > > > recovery.
> > > > > > > Could we document this clearly? Could we prevent it from being
> > used
> > > > > > > incorrectly (e.g. throw an exception if the producer has called
> > > other
> > > > > > > methods like send())?
> > > > > > >
> > > > > > > 18. "either prepareTransaction was called or
> > initTransaction(true)
> > > > was
> > > > > > > called": "either" should be "neither"?
> > > > > > >
> > > > > > > 19. Since InitProducerId always bumps up the epoch, it creates
> a
> > > > > > situation
> > > > > > > where there could be multiple outstanding txns. The following
> is
> > an
> > > > > > example
> > > > > > > of a potential problem during recovery.
> > > > > > >    The last txn epoch in the external store is 41 when the app
> > > dies.
> > > > > > >    Instance1 is created for recovery.
> > > > > > >      1. (instance1) InitProducerId(keepPreparedTxn=true),
> > epoch=42,
> > > > > > > ongoingEpoch=41
> > > > > > >      2. (instance1) dies before completeTxn(41) can be called.
> > > > > > >    Instance2 is created for recovery.
> > > > > > >      3. (instance2) InitProducerId(keepPreparedTxn=true),
> > epoch=43,
> > > > > > > ongoingEpoch=42
> > > > > > >      4. (instance2) completeTxn(41) => abort
> > > > > > >    The first problem is that 41 now is aborted when it should
> be
> > > > > > committed.
> > > > > > > The second one is that it's not clear who could abort epoch 42,
> > > which
> > > > > is
> > > > > > > still open.
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan
> > > > > > <jols...@confluent.io.invalid
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Artem,
> > > > > > > >
> > > > > > > > Thanks for the updates. I think what you say makes sense. I
> > just
> > > > > > updated
> > > > > > > my
> > > > > > > > KIP so I want to reconcile some of the changes we made
> > especially
> > > > > with
> > > > > > > > respect to the TransactionLogValue.
> > > > > > > >
> > > > > > > > Firstly, I believe tagged fields require a default value so
> > that
> > > if
> > > > > > they
> > > > > > > > are not filled, we return the default (and know that they
> were
> > > > > empty).
> > > > > > > For
> > > > > > > > my KIP, I proposed the default for producer ID tagged fields
> > > should
> > > > > be
> > > > > > > -1.
> > > > > > > > I was wondering if we could update the KIP to include the
> > default
> > > > > > values
> > > > > > > > for producer ID and epoch.
> > > > > > > >
> > > > > > > > Next, I noticed we decided to rename the fields. I guess that
> > the
> > > > > field
> > > > > > > > "NextProducerId" in my KIP correlates to "ProducerId" in this
> > > KIP.
> > > > Is
> > > > > > > that
> > > > > > > > correct? So we would have "TransactionProducerId" for the
> > > > non-tagged
> > > > > > > field
> > > > > > > > and have "ProducerId" (NextProducerId) and "PrevProducerId"
> as
> > > > tagged
> > > > > > > > fields the final version after KIP-890 and KIP-936 are
> > > implemented.
> > > > > Is
> > > > > > > this
> > > > > > > > correct? I think the tags will need updating, but that is
> > > trivial.
> > > > > > > >
> > > > > > > > The final question I had was with respect to storing the new
> > > epoch.
> > > > > In
> > > > > > > > KIP-890 part 2 (epoch bumps) I think we concluded that we
> don't
> > > > need
> > > > > to
> > > > > > > > store the epoch since we can interpret the previous epoch
> based
> > > on
> > > > > the
> > > > > > > > producer ID. But here we could call the InitProducerId
> multiple
> > > > times
> > > > > > and
> > > > > > > > we only want the producer with the correct epoch to be able
> to
> > > > commit
> > > > > > the
> > > > > > > > transaction. Is that the correct reasoning for why we need
> > epoch
> > > > here
> > > > > > but
> > > > > > > > not the Prepare/Commit state.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Justine
> > > > > > > >
> > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hi Justine,
> > > > > > > > >
> > > > > > > > > After thinking a bit about supporting atomic dual writes
> for
> > > > Kafka
> > > > > +
> > > > > > > > NoSQL
> > > > > > > > > database, I came to a conclusion that we do need to bump
> the
> > > > epoch
> > > > > > even
> > > > > > > > > with InitProducerId(keepPreparedTxn=true).  As I described
> in
> > > my
> > > > > > > previous
> > > > > > > > > email, we wouldn't need to bump the epoch to protect from
> > > zombies
> > > > > so
> > > > > > > that
> > > > > > > > > reasoning is still true.  But we cannot protect from
> > > split-brain
> > > > > > > > scenarios
> > > > > > > > > when two or more instances of a producer with the same
> > > > > transactional
> > > > > > id
> > > > > > > > try
> > > > > > > > > to produce at the same time.  The dual-write example for
> SQL
> > > > > > databases
> > > > > > > (
> > > > > > > > > https://github.com/apache/kafka/pull/14231/files) doesn't
> > > have a
> > > > > > > > > split-brain problem because execution is protected by the
> > > update
> > > > > lock
> > > > > > > on
> > > > > > > > > the transaction state record; however NoSQL databases may
> not
> > > > have
> > > > > > this
> > > > > > > > > protection (I'll write an example for NoSQL database
> > dual-write
> > > > > > soon).
> > > > > > > > >
> > > > > > > > > In a nutshell, here is an example of a split-brain
> scenario:
> > > > > > > > >
> > > > > > > > >    1. (instance1) InitProducerId(keepPreparedTxn=true), got
> > > > > epoch=42
> > > > > > > > >    2. (instance2) InitProducerId(keepPreparedTxn=true), got
> > > > > epoch=42
> > > > > > > > >    3. (instance1) CommitTxn, epoch bumped to 43
> > > > > > > > >    4. (instance2) CommitTxn, this is considered a retry, so
> > it
> > > > got
> > > > > > > epoch
> > > > > > > > 43
> > > > > > > > >    as well
> > > > > > > > >    5. (instance1) Produce messageA w/sequence 1
> > > > > > > > >    6. (instance2) Produce messageB w/sequence 1, this is
> > > > > considered a
> > > > > > > > >    duplicate
> > > > > > > > >    7. (instance2) Produce messageC w/sequence 2
> > > > > > > > >    8. (instance1) Produce messageD w/sequence 2, this is
> > > > > considered a
> > > > > > > > >    duplicate
> > > > > > > > >
> > > > > > > > > Now if either of those commit the transaction, it would
> have
> > a
> > > > mix
> > > > > of
> > > > > > > > > messages from the two instances (messageA and messageC).
> > With
> > > > the
> > > > > > > proper
> > > > > > > > > epoch bump, instance1 would get fenced at step 3.
> > > > > > > > >
> > > > > > > > > In order to update epoch in
> > > InitProducerId(keepPreparedTxn=true)
> > > > we
> > > > > > > need
> > > > > > > > to
> > > > > > > > > preserve the ongoing transaction's epoch (and producerId,
> if
> > > the
> > > > > > epoch
> > > > > > > > > overflows), because we'd need to make a correct decision
> when
> > > we
> > > > > > > compare
> > > > > > > > > the PreparedTxnState that we read from the database with
> the
> > > > > > > (producerId,
> > > > > > > > > epoch) of the ongoing transaction.
> > > > > > > > >
> > > > > > > > > I've updated the KIP with the following:
> > > > > > > > >
> > > > > > > > >    - Ongoing transaction now has 2 (producerId, epoch)
> pairs
> > --
> > > > one
> > > > > > > pair
> > > > > > > > >    describes the ongoing transaction, the other pair
> > describes
> > > > > > expected
> > > > > > > > > epoch
> > > > > > > > >    for operations on this transactional id
> > > > > > > > >    - InitProducerIdResponse now returns 2 (producerId,
> epoch)
> > > > pairs
> > > > > > > > >    - TransactionalLogValue now has 2 (producerId, epoch)
> > pairs,
> > > > the
> > > > > > new
> > > > > > > > >    values added as tagged fields, so it's easy to downgrade
> > > > > > > > >    - Added a note about downgrade in the Compatibility
> > section
> > > > > > > > >    - Added a rejected alternative
> > > > > > > > >
> > > > > > > > > -Artem
> > > > > > > > >
> > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <
> > > > > > alivsh...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Justine,
> > > > > > > > > >
> > > > > > > > > > Thank you for the questions.  Currently (pre-KIP-939) we
> > > always
> > > > > > bump
> > > > > > > > the
> > > > > > > > > > epoch on InitProducerId and abort an ongoing transaction
> > (if
> > > > > > any).  I
> > > > > > > > > > expect this behavior will continue with KIP-890 as well.
> > > > > > > > > >
> > > > > > > > > > With KIP-939 we need to support the case when the ongoing
> > > > > > transaction
> > > > > > > > > > needs to be preserved when keepPreparedTxn=true.  Bumping
> > > epoch
> > > > > > > without
> > > > > > > > > > aborting or committing a transaction is tricky because
> > epoch
> > > > is a
> > > > > > > short
> > > > > > > > > > value and it's easy to overflow.  Currently, the overflow
> > > case
> > > > is
> > > > > > > > handled
> > > > > > > > > > by aborting the ongoing transaction, which would send out
> > > > > > transaction
> > > > > > > > > > markers with epoch=Short.MAX_VALUE to the partition
> > leaders,
> > > > > which
> > > > > > > > would
> > > > > > > > > > fence off any messages with the producer id that started
> > the
> > > > > > > > transaction
> > > > > > > > > > (they would have epoch that is less than
> Short.MAX_VALUE).
> > > > Then
> > > > > it
> > > > > > > is
> > > > > > > > > safe
> > > > > > > > > > to allocate a new producer id and use it in new
> > transactions.
> > > > > > > > > >
> > > > > > > > > > We could say that maybe when keepPreparedTxn=true we bump
> > > epoch
> > > > > > > unless
> > > > > > > > it
> > > > > > > > > > leads to overflow, and don't bump epoch in the overflow
> > case.
> > > > I
> > > > > > > don't
> > > > > > > > > > think it's a good solution because if it's not safe to
> keep
> > > the
> > > > > > same
> > > > > > > > > epoch
> > > > > > > > > > when keepPreparedTxn=true, then we must handle the epoch
> > > > overflow
> > > > > > > case
> > > > > > > > as
> > > > > > > > > > well.  So either we should convince ourselves that it's
> > safe
> > > to
> > > > > > keep
> > > > > > > > the
> > > > > > > > > > epoch and do it in the general case, or we always bump
> the
> > > > epoch
> > > > > > and
> > > > > > > > > handle
> > > > > > > > > > the overflow.
> > > > > > > > > >
> > > > > > > > > > With KIP-890, we bump the epoch on every transaction
> > commit /
> > > > > > abort.
> > > > > > > > > This
> > > > > > > > > > guarantees that even if
> > InitProducerId(keepPreparedTxn=true)
> > > > > > doesn't
> > > > > > > > > > increment epoch on the ongoing transaction, the client
> will
> > > > have
> > > > > to
> > > > > > > > call
> > > > > > > > > > commit or abort to finish the transaction and will
> > increment
> > > > the
> > > > > > > epoch
> > > > > > > > > (and
> > > > > > > > > > handle epoch overflow, if needed).  If the ongoing
> > > transaction
> > > > > was
> > > > > > > in a
> > > > > > > > > bad
> > > > > > > > > > state and had some zombies waiting to arrive, the abort
> > > > operation
> > > > > > > would
> > > > > > > > > > fence them because with KIP-890 every abort would bump
> the
> > > > epoch.
> > > > > > > > > >
> > > > > > > > > > We could also look at this from the following
> perspective.
> > > > With
> > > > > > > > KIP-890,
> > > > > > > > > > zombies won't be able to cross transaction boundaries;
> each
> > > > > > > transaction
> > > > > > > > > > completion creates a boundary and any activity in the
> past
> > > gets
> > > > > > > > confined
> > > > > > > > > in
> > > > > > > > > > the boundary.  Then data in any partition would look like
> > > this:
> > > > > > > > > >
> > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > >
> > > > > > > > > > Now if we inject steps 3a and 3b like this:
> > > > > > > > > >
> > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > 3a. crash
> > > > > > > > > > 3b. InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > >
> > > > > > > > > > The invariant still holds even with steps 3a and 3b --
> > > whatever
> > > > > > > > activity
> > > > > > > > > > was in the past will get confined in the past with
> > mandatory
> > > > > abort
> > > > > > /
> > > > > > > > > commit
> > > > > > > > > > that must follow  InitProducerId(keepPreparedTxn=true).
> > > > > > > > > >
> > > > > > > > > > So KIP-890 provides the proper isolation between
> > > transactions,
> > > > so
> > > > > > > > > > injecting crash + InitProducerId(keepPreparedTxn=true)
> into
> > > the
> > > > > > > > > > transaction sequence is safe from the zombie protection
> > > > > > perspective.
> > > > > > > > > >
> > > > > > > > > > That said, I'm still thinking about it and looking for
> > cases
> > > > that
> > > > > > > might
> > > > > > > > > > break because we don't bump epoch when
> > > > > > > > > > InitProducerId(keepPreparedTxn=true), if such cases
> exist,
> > > > we'll
> > > > > > need
> > > > > > > > to
> > > > > > > > > > develop the logic to handle epoch overflow for ongoing
> > > > > > transactions.
> > > > > > > > > >
> > > > > > > > > > -Artem
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan
> > > > > > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > >> Hey Artem,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the KIP. I had a question about epoch
> bumping.
> > > > > > > > > >>
> > > > > > > > > >> Previously when we send an InitProducerId request on
> > > Producer
> > > > > > > startup,
> > > > > > > > > we
> > > > > > > > > >> bump the epoch and abort the transaction. Is it correct
> to
> > > > > assume
> > > > > > > that
> > > > > > > > > we
> > > > > > > > > >> will still bump the epoch, but just not abort the
> > > transaction?
> > > > > > > > > >> If we still bump the epoch in this case, how does this
> > > > interact
> > > > > > with
> > > > > > > > > >> KIP-890 where we also bump the epoch on every
> transaction.
> > > (I
> > > > > > think
> > > > > > > > this
> > > > > > > > > >> means that we may skip epochs and the data itself will
> all
> > > > have
> > > > > > the
> > > > > > > > same
> > > > > > > > > >> epoch)
> > > > > > > > > >>
> > > > > > > > > >> I may have follow ups depending on the answer to this.
> :)
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >> Justine
> > > > > > > > > >>
> > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
> > > > > > > > > >> <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hi Alex,
> > > > > > > > > >> >
> > > > > > > > > >> > Thank you for your questions.
> > > > > > > > > >> >
> > > > > > > > > >> > > the purpose of having broker-level
> > > > > > > > > transaction.two.phase.commit.enable
> > > > > > > > > >> >
> > > > > > > > > >> > The thinking is that 2PC is a bit of an advanced
> > construct
> > > > so
> > > > > > > > enabling
> > > > > > > > > >> 2PC
> > > > > > > > > >> > in a Kafka cluster should be an explicit decision.  If
> > it
> > > is
> > > > > set
> > > > > > > to
> > > > > > > > > >> 'false'
> > > > > > > > > >> > InitiProducerId (and initTransactions) would
> > > > > > > > > >> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> > > > > > > > > >> >
> > > > > > > > > >> > > WDYT about adding an AdminClient method that returns
> > the
> > > > > state
> > > > > > > of
> > > > > > > > > >> > transaction.two.phase.commit.enable
> > > > > > > > > >> >
> > > > > > > > > >> > I wonder if the client could just try to use 2PC and
> > then
> > > > > handle
> > > > > > > the
> > > > > > > > > >> error
> > > > > > > > > >> > (e.g. if it needs to fall back to ordinary
> > transactions).
> > > > > This
> > > > > > > way
> > > > > > > > it
> > > > > > > > > >> > could uniformly handle cases when Kafka cluster
> doesn't
> > > > > support
> > > > > > > 2PC
> > > > > > > > > >> > completely and cases when 2PC is restricted to certain
> > > > users.
> > > > > > We
> > > > > > > > > could
> > > > > > > > > >> > also expose this config in describeConfigs, if the
> > > fallback
> > > > > > > approach
> > > > > > > > > >> > doesn't work for some scenarios.
> > > > > > > > > >> >
> > > > > > > > > >> > -Artem
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
> > > > > > > > > >> > <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Hi Artem,
> > > > > > > > > >> > >
> > > > > > > > > >> > > Thanks for publishing this KIP!
> > > > > > > > > >> > >
> > > > > > > > > >> > > Can you please clarify the purpose of having
> > > broker-level
> > > > > > > > > >> > > transaction.two.phase.commit.enable config in
> addition
> > > to
> > > > > the
> > > > > > > new
> > > > > > > > > >> ACL? If
> > > > > > > > > >> > > the brokers are configured with
> > > > > > > > > >> > transaction.two.phase.commit.enable=false,
> > > > > > > > > >> > > at what point will a client configured with
> > > > > > > > > >> > > transaction.two.phase.commit.enable=true fail? Will
> it
> > > > > happen
> > > > > > at
> > > > > > > > > >> > > KafkaProducer#initTransactions?
> > > > > > > > > >> > >
> > > > > > > > > >> > > WDYT about adding an AdminClient method that returns
> > the
> > > > > state
> > > > > > > of
> > > > > > > > t
> > > > > > > > > >> > > ransaction.two.phase.commit.enable? This way,
> clients
> > > > would
> > > > > > know
> > > > > > > > in
> > > > > > > > > >> > advance
> > > > > > > > > >> > > if 2PC is enabled on the brokers.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Best,
> > > > > > > > > >> > > Alex
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <
> > > > > > > > > roger.hoo...@gmail.com>
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Other than supporting multiplexing transactional
> > > streams
> > > > > on
> > > > > > a
> > > > > > > > > single
> > > > > > > > > >> > > > producer, I don't see how to improve it.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
> > > > > > > > > >> > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > Hi Roger,
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Thank you for summarizing the cons.  I agree and
> > I'm
> > > > > > curious
> > > > > > > > > what
> > > > > > > > > >> > would
> > > > > > > > > >> > > > be
> > > > > > > > > >> > > > > the alternatives to solve these problems better
> > and
> > > if
> > > > > > they
> > > > > > > > can
> > > > > > > > > be
> > > > > > > > > >> > > > > incorporated into this proposal (or built
> > > > independently
> > > > > in
> > > > > > > > > >> addition
> > > > > > > > > >> > to
> > > > > > > > > >> > > or
> > > > > > > > > >> > > > > on top of this proposal).  E.g. one potential
> > > > extension
> > > > > we
> > > > > > > > > >> discussed
> > > > > > > > > >> > > > > earlier in the thread could be multiplexing
> > logical
> > > > > > > > > transactional
> > > > > > > > > >> > > > "streams"
> > > > > > > > > >> > > > > with a single producer.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > -Artem
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <
> > > > > > > > > >> roger.hoo...@gmail.com
> > > > > > > > > >> > >
> > > > > > > > > >> > > > > wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > > Thanks.  I like that you're moving Kafka
> toward
> > > > > > supporting
> > > > > > > > > this
> > > > > > > > > >> > > > > dual-write
> > > > > > > > > >> > > > > > pattern.  Each use case needs to consider the
> > > > > tradeoffs.
> > > > > > > > You
> > > > > > > > > >> > already
> > > > > > > > > >> > > > > > summarized the pros very well in the KIP.  I
> > would
> > > > > > > summarize
> > > > > > > > > the
> > > > > > > > > >> > cons
> > > > > > > > > >> > > > > > as follows:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > - you sacrifice availability - each write
> > requires
> > > > > both
> > > > > > DB
> > > > > > > > and
> > > > > > > > > >> > Kafka
> > > > > > > > > >> > > to
> > > > > > > > > >> > > > > be
> > > > > > > > > >> > > > > > available so I think your overall application
> > > > > > availability
> > > > > > > > is
> > > > > > > > > 1
> > > > > > > > > >> -
> > > > > > > > > >> > > p(DB
> > > > > > > > > >> > > > is
> > > > > > > > > >> > > > > > unavailable)*p(Kafka is unavailable).
> > > > > > > > > >> > > > > > - latency will be higher and throughput lower
> -
> > > each
> > > > > > write
> > > > > > > > > >> requires
> > > > > > > > > >> > > > both
> > > > > > > > > >> > > > > > writes to DB and Kafka while holding an
> > exclusive
> > > > lock
> > > > > > in
> > > > > > > > DB.
> > > > > > > > > >> > > > > > - you need to create a producer per unit of
> > > > > concurrency
> > > > > > in
> > > > > > > > > your
> > > > > > > > > >> app
> > > > > > > > > >> > > > which
> > > > > > > > > >> > > > > > has some overhead in the app and Kafka side
> > > (number
> > > > of
> > > > > > > > > >> connections,
> > > > > > > > > >> > > > poor
> > > > > > > > > >> > > > > > batching).  I assume the producers would need
> to
> > > be
> > > > > > > > configured
> > > > > > > > > >> for
> > > > > > > > > >> > > low
> > > > > > > > > >> > > > > > latency (linger.ms=0)
> > > > > > > > > >> > > > > > - there's some complexity in managing stable
> > > > > > transactional
> > > > > > > > ids
> > > > > > > > > >> for
> > > > > > > > > >> > > each
> > > > > > > > > >> > > > > > producer/concurrency unit in your application.
> > > With
> > > > > k8s
> > > > > > > > > >> > deployment,
> > > > > > > > > >> > > > you
> > > > > > > > > >> > > > > > may need to switch to something like a
> > StatefulSet
> > > > > that
> > > > > > > > gives
> > > > > > > > > >> each
> > > > > > > > > >> > > pod
> > > > > > > > > >> > > > a
> > > > > > > > > >> > > > > > stable identity across restarts.  On top of
> that
> > > pod
> > > > > > > > identity
> > > > > > > > > >> which
> > > > > > > > > >> > > you
> > > > > > > > > >> > > > > can
> > > > > > > > > >> > > > > > use as a prefix, you then assign unique
> > > > transactional
> > > > > > ids
> > > > > > > to
> > > > > > > > > >> each
> > > > > > > > > >> > > > > > concurrency unit (thread/goroutine).
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem
> Livshits
> > > > > > > > > >> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > > Hi Roger,
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > Thank you for the feedback.  You make a very
> > > good
> > > > > > point
> > > > > > > > that
> > > > > > > > > >> we
> > > > > > > > > >> > > also
> > > > > > > > > >> > > > > > > discussed internally.  Adding support for
> > > multiple
> > > > > > > > > concurrent
> > > > > > > > > >> > > > > > > transactions in one producer could be
> valuable
> > > but
> > > > > it
> > > > > > > > seems
> > > > > > > > > to
> > > > > > > > > >> > be a
> > > > > > > > > >> > > > > > fairly
> > > > > > > > > >> > > > > > > large and independent change that would
> > deserve
> > > a
> > > > > > > separate
> > > > > > > > > >> KIP.
> > > > > > > > > >> > If
> > > > > > > > > >> > > > > such
> > > > > > > > > >> > > > > > > support is added we could modify 2PC
> > > functionality
> > > > > to
> > > > > > > > > >> incorporate
> > > > > > > > > >> > > > that.
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > > Maybe not too bad but a bit of pain to
> > manage
> > > > > these
> > > > > > > ids
> > > > > > > > > >> inside
> > > > > > > > > >> > > each
> > > > > > > > > >> > > > > > > process and across all application
> processes.
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > I'm not sure if supporting multiple
> > transactions
> > > > in
> > > > > > one
> > > > > > > > > >> producer
> > > > > > > > > >> > > > would
> > > > > > > > > >> > > > > > make
> > > > > > > > > >> > > > > > > id management simpler: we'd need to store a
> > > piece
> > > > of
> > > > > > > data
> > > > > > > > > per
> > > > > > > > > >> > > > > > transaction,
> > > > > > > > > >> > > > > > > so whether it's N producers with a single
> > > > > transaction
> > > > > > > or N
> > > > > > > > > >> > > > transactions
> > > > > > > > > >> > > > > > > with a single producer, it's still roughly
> the
> > > > same
> > > > > > > amount
> > > > > > > > > of
> > > > > > > > > >> > data
> > > > > > > > > >> > > to
> > > > > > > > > >> > > > > > > manage.  In fact, managing transactional ids
> > > > > (current
> > > > > > > > > >> proposal)
> > > > > > > > > >> > > might
> > > > > > > > > >> > > > > be
> > > > > > > > > >> > > > > > > easier, because the id is controlled by the
> > > > > > application
> > > > > > > > and
> > > > > > > > > it
> > > > > > > > > >> > > knows
> > > > > > > > > >> > > > > how
> > > > > > > > > >> > > > > > to
> > > > > > > > > >> > > > > > > complete the transaction after crash /
> > restart;
> > > > > while
> > > > > > a
> > > > > > > > TID
> > > > > > > > > >> would
> > > > > > > > > >> > > be
> > > > > > > > > >> > > > > > > generated by Kafka and that would create a
> > > > question
> > > > > of
> > > > > > > > > >> starting
> > > > > > > > > >> > > Kafka
> > > > > > > > > >> > > > > > > transaction, but not saving its TID and then
> > > > > crashing,
> > > > > > > > then
> > > > > > > > > >> > > figuring
> > > > > > > > > >> > > > > out
> > > > > > > > > >> > > > > > > which transactions to abort and etc.
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > > 2) creating a separate producer for each
> > > > > concurrency
> > > > > > > > slot
> > > > > > > > > in
> > > > > > > > > >> > the
> > > > > > > > > >> > > > > > > application
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > This is a very valid concern.  Maybe we'd
> need
> > > to
> > > > > have
> > > > > > > > some
> > > > > > > > > >> > > > > multiplexing
> > > > > > > > > >> > > > > > of
> > > > > > > > > >> > > > > > > transactional logical "streams" over the
> same
> > > > > > > connection.
> > > > > > > > > >> Seems
> > > > > > > > > >> > > > like a
> > > > > > > > > >> > > > > > > separate KIP, though.
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > > Otherwise, it seems you're left with
> > > > > single-threaded
> > > > > > > > model
> > > > > > > > > >> per
> > > > > > > > > >> > > > > > > application process?
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > That's a fair assessment.  Not necessarily
> > > exactly
> > > > > > > > > >> > single-threaded
> > > > > > > > > >> > > > per
> > > > > > > > > >> > > > > > > application, but a single producer per
> thread
> > > > model
> > > > > > > (i.e.
> > > > > > > > an
> > > > > > > > > >> > > > > application
> > > > > > > > > >> > > > > > > could have a pool of threads + producers to
> > > > increase
> > > > > > > > > >> > concurrency).
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > -Artem
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger
> Hoover <
> > > > > > > > > >> > > roger.hoo...@gmail.com
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > > > wrote:
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > > Artem,
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Thanks for the reply.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > If I understand correctly, Kafka does not
> > > > support
> > > > > > > > > concurrent
> > > > > > > > > >> > > > > > transactions
> > > > > > > > > >> > > > > > > > from the same producer (transactional id).
> > I
> > > > > think
> > > > > > > this
> > > > > > > > > >> means
> > > > > > > > > >> > > that
> > > > > > > > > >> > > > > > > > applications that want to support
> in-process
> > > > > > > concurrency
> > > > > > > > > >> (say
> > > > > > > > > >> > > > > > > thread-level
> > > > > > > > > >> > > > > > > > concurrency with row-level DB locking)
> would
> > > > need
> > > > > to
> > > > > > > > > manage
> > > > > > > > > >> > > > separate
> > > > > > > > > >> > > > > > > > transactional ids and producers per thread
> > and
> > > > > then
> > > > > > > > store
> > > > > > > > > >> txn
> > > > > > > > > >> > > state
> > > > > > > > > >> > > > > > > > accordingly.   The potential usability
> > > > downsides I
> > > > > > see
> > > > > > > > are
> > > > > > > > > >> > > > > > > > 1) managing a set of transactional ids for
> > > each
> > > > > > > > > application
> > > > > > > > > >> > > process
> > > > > > > > > >> > > > > > that
> > > > > > > > > >> > > > > > > > scales up to it's max concurrency.  Maybe
> > not
> > > > too
> > > > > > bad
> > > > > > > > but
> > > > > > > > > a
> > > > > > > > > >> bit
> > > > > > > > > >> > > of
> > > > > > > > > >> > > > > pain
> > > > > > > > > >> > > > > > > to
> > > > > > > > > >> > > > > > > > manage these ids inside each process and
> > > across
> > > > > all
> > > > > > > > > >> application
> > > > > > > > > >> > > > > > > processes.
> > > > > > > > > >> > > > > > > > 2) creating a separate producer for each
> > > > > concurrency
> > > > > > > > slot
> > > > > > > > > in
> > > > > > > > > >> > the
> > > > > > > > > >> > > > > > > > application - this could create a lot more
> > > > > producers
> > > > > > > and
> > > > > > > > > >> > > resultant
> > > > > > > > > >> > > > > > > > connections to Kafka than the typical
> model
> > > of a
> > > > > > > single
> > > > > > > > > >> > producer
> > > > > > > > > >> > > > per
> > > > > > > > > >> > > > > > > > process.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Otherwise, it seems you're left with
> > > > > single-threaded
> > > > > > > > model
> > > > > > > > > >> per
> > > > > > > > > >> > > > > > > application
> > > > > > > > > >> > > > > > > > process?
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Roger
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem
> > Livshits
> > > > > > > > > >> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > > Hi Roger, Arjun,
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Thank you for the questions.
> > > > > > > > > >> > > > > > > > > > It looks like the application must
> have
> > > > stable
> > > > > > > > > >> > transactional
> > > > > > > > > >> > > > ids
> > > > > > > > > >> > > > > > over
> > > > > > > > > >> > > > > > > > > time?
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > The transactional id should uniquely
> > > identify
> > > > a
> > > > > > > > producer
> > > > > > > > > >> > > instance
> > > > > > > > > >> > > > > and
> > > > > > > > > >> > > > > > > > needs
> > > > > > > > > >> > > > > > > > > to be stable across the restarts.  If
> the
> > > > > > > > transactional
> > > > > > > > > >> id is
> > > > > > > > > >> > > not
> > > > > > > > > >> > > > > > > stable
> > > > > > > > > >> > > > > > > > > across restarts, then zombie messages
> > from a
> > > > > > > previous
> > > > > > > > > >> > > incarnation
> > > > > > > > > >> > > > > of
> > > > > > > > > >> > > > > > > the
> > > > > > > > > >> > > > > > > > > producer may violate atomicity.  If
> there
> > > are
> > > > 2
> > > > > > > > producer
> > > > > > > > > >> > > > instances
> > > > > > > > > >> > > > > > > > > concurrently producing data with the
> same
> > > > > > > > transactional
> > > > > > > > > >> id,
> > > > > > > > > >> > > they
> > > > > > > > > >> > > > > are
> > > > > > > > > >> > > > > > > > going
> > > > > > > > > >> > > > > > > > > to constantly fence each other and most
> > > likely
> > > > > > make
> > > > > > > > > >> little or
> > > > > > > > > >> > > no
> > > > > > > > > >> > > > > > > > progress.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > The name might be a little bit confusing
> > as
> > > it
> > > > > may
> > > > > > > be
> > > > > > > > > >> > mistaken
> > > > > > > > > >> > > > for
> > > > > > > > > >> > > > > a
> > > > > > > > > >> > > > > > > > > transaction id / TID that uniquely
> > > identifies
> > > > > > every
> > > > > > > > > >> > > transaction.
> > > > > > > > > >> > > > > The
> > > > > > > > > >> > > > > > > > name
> > > > > > > > > >> > > > > > > > > and the semantics were defined in the
> > > original
> > > > > > > > > >> > > > > exactly-once-semantics
> > > > > > > > > >> > > > > > > > (EoS)
> > > > > > > > > >> > > > > > > > > proposal (
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > >> > > > > > > > > )
> > > > > > > > > >> > > > > > > > > and KIP-939 just build on top of that.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > > I'm curious to understand what happens
> > if
> > > > the
> > > > > > > > producer
> > > > > > > > > >> > dies,
> > > > > > > > > >> > > > and
> > > > > > > > > >> > > > > > does
> > > > > > > > > >> > > > > > > > not
> > > > > > > > > >> > > > > > > > > come up and recover the pending
> > transaction
> > > > > within
> > > > > > > the
> > > > > > > > > >> > > > transaction
> > > > > > > > > >> > > > > > > > timeout
> > > > > > > > > >> > > > > > > > > interval.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > If the producer / application never
> comes
> > > > back,
> > > > > > the
> > > > > > > > > >> > transaction
> > > > > > > > > >> > > > > will
> > > > > > > > > >> > > > > > > > remain
> > > > > > > > > >> > > > > > > > > in prepared (a.k.a. "in-doubt") state
> > until
> > > an
> > > > > > > > operator
> > > > > > > > > >> > > > forcefully
> > > > > > > > > >> > > > > > > > > terminates the transaction.  That's why
> > > there
> > > > > is a
> > > > > > > new
> > > > > > > > > >> ACL is
> > > > > > > > > >> > > > > defined
> > > > > > > > > >> > > > > > > in
> > > > > > > > > >> > > > > > > > > this proposal -- this functionality
> should
> > > > only
> > > > > > > > provided
> > > > > > > > > >> to
> > > > > > > > > >> > > > > > > applications
> > > > > > > > > >> > > > > > > > > that implement proper recovery logic.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > -Artem
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun
> > > Satish
> > > > <
> > > > > > > > > >> > > > > > arjun.sat...@gmail.com>
> > > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > > Hello Artem,
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > Thanks for the KIP.
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > I have the same question as Roger on
> > > > > concurrent
> > > > > > > > > writes,
> > > > > > > > > >> and
> > > > > > > > > >> > > an
> > > > > > > > > >> > > > > > > > additional
> > > > > > > > > >> > > > > > > > > > one on consumer behavior. Typically,
> > > > > > transactions
> > > > > > > > will
> > > > > > > > > >> > > timeout
> > > > > > > > > >> > > > if
> > > > > > > > > >> > > > > > not
> > > > > > > > > >> > > > > > > > > > committed within some time interval.
> > With
> > > > the
> > > > > > > > proposed
> > > > > > > > > >> > > changes
> > > > > > > > > >> > > > in
> > > > > > > > > >> > > > > > > this
> > > > > > > > > >> > > > > > > > > KIP,
> > > > > > > > > >> > > > > > > > > > consumers cannot consume past the
> > ongoing
> > > > > > > > transaction.
> > > > > > > > > >> I'm
> > > > > > > > > >> > > > > curious
> > > > > > > > > >> > > > > > to
> > > > > > > > > >> > > > > > > > > > understand what happens if the
> producer
> > > > dies,
> > > > > > and
> > > > > > > > does
> > > > > > > > > >> not
> > > > > > > > > >> > > come
> > > > > > > > > >> > > > > up
> > > > > > > > > >> > > > > > > and
> > > > > > > > > >> > > > > > > > > > recover the pending transaction within
> > the
> > > > > > > > transaction
> > > > > > > > > >> > > timeout
> > > > > > > > > >> > > > > > > > interval.
> > > > > > > > > >> > > > > > > > > Or
> > > > > > > > > >> > > > > > > > > > are we saying that when used in this
> 2PC
> > > > > > context,
> > > > > > > we
> > > > > > > > > >> should
> > > > > > > > > >> > > > > > configure
> > > > > > > > > >> > > > > > > > > these
> > > > > > > > > >> > > > > > > > > > transaction timeouts to very large
> > > > durations?
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > Thanks in advance!
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > Best,
> > > > > > > > > >> > > > > > > > > > Arjun
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger
> > > > Hoover <
> > > > > > > > > >> > > > > > roger.hoo...@gmail.com
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > > > wrote:
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > Hi Artem,
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > Thanks for writing this KIP.  Can
> you
> > > > > clarify
> > > > > > > the
> > > > > > > > > >> > > > requirements
> > > > > > > > > >> > > > > a
> > > > > > > > > >> > > > > > > bit
> > > > > > > > > >> > > > > > > > > more
> > > > > > > > > >> > > > > > > > > > > for managing transaction state?  It
> > > looks
> > > > > like
> > > > > > > the
> > > > > > > > > >> > > > application
> > > > > > > > > >> > > > > > must
> > > > > > > > > >> > > > > > > > > have
> > > > > > > > > >> > > > > > > > > > > stable transactional ids over time?
> > >  What
> > > > > is
> > > > > > > the
> > > > > > > > > >> > > granularity
> > > > > > > > > >> > > > > of
> > > > > > > > > >> > > > > > > > those
> > > > > > > > > >> > > > > > > > > > ids
> > > > > > > > > >> > > > > > > > > > > and producers?  Say the application
> > is a
> > > > > > > > > >> multi-threaded
> > > > > > > > > >> > > Java
> > > > > > > > > >> > > > > web
> > > > > > > > > >> > > > > > > > > server,
> > > > > > > > > >> > > > > > > > > > > can/should all the concurrent
> threads
> > > > share
> > > > > a
> > > > > > > > > >> > transactional
> > > > > > > > > >> > > > id
> > > > > > > > > >> > > > > > and
> > > > > > > > > >> > > > > > > > > > > producer?  That doesn't seem right
> to
> > me
> > > > > > unless
> > > > > > > > the
> > > > > > > > > >> > > > application
> > > > > > > > > >> > > > > > is
> > > > > > > > > >> > > > > > > > > using
> > > > > > > > > >> > > > > > > > > > > global DB locks that serialize all
> > > > requests.
> > > > > > > > > >> Instead, if
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > > > > > > application
> > > > > > > > > >> > > > > > > > > > > uses row-level DB locks, there could
> > be
> > > > > > > multiple,
> > > > > > > > > >> > > concurrent,
> > > > > > > > > >> > > > > > > > > independent
> > > > > > > > > >> > > > > > > > > > > txns happening in the same JVM so it
> > > seems
> > > > > > like
> > > > > > > > the
> > > > > > > > > >> > > > granularity
> > > > > > > > > >> > > > > > > > > managing
> > > > > > > > > >> > > > > > > > > > > transactional ids and txn state
> needs
> > to
> > > > > line
> > > > > > up
> > > > > > > > > with
> > > > > > > > > >> > > > > granularity
> > > > > > > > > >> > > > > > > of
> > > > > > > > > >> > > > > > > > > the
> > > > > > > > > >> > > > > > > > > > DB
> > > > > > > > > >> > > > > > > > > > > locking.
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > Does that make sense or am I
> > > > > misunderstanding?
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > Thanks,
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > Roger
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM
> Artem
> > > > > > Livshits
> > > > > > > > > >> > > > > > > > > > > <alivsh...@confluent.io.invalid>
> > wrote:
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > > Hello,
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > > This is a discussion thread for
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > > > >> > > > > > > > > > > > .
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > > The KIP proposes extending Kafka
> > > > > transaction
> > > > > > > > > support
> > > > > > > > > >> > > (that
> > > > > > > > > >> > > > > > > already
> > > > > > > > > >> > > > > > > > > uses
> > > > > > > > > >> > > > > > > > > > > 2PC
> > > > > > > > > >> > > > > > > > > > > > under the hood) to enable
> atomicity
> > of
> > > > > dual
> > > > > > > > writes
> > > > > > > > > >> to
> > > > > > > > > >> > > Kafka
> > > > > > > > > >> > > > > and
> > > > > > > > > >> > > > > > > an
> > > > > > > > > >> > > > > > > > > > > external
> > > > > > > > > >> > > > > > > > > > > > database, and helps to fix a long
> > > > standing
> > > > > > > Flink
> > > > > > > > > >> issue.
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > > An example of code that uses the
> > dual
> > > > > write
> > > > > > > > recipe
> > > > > > > > > >> with
> > > > > > > > > >> > > > JDBC
> > > > > > > > > >> > > > > > and
> > > > > > > > > >> > > > > > > > > should
> > > > > > > > > >> > > > > > > > > > > > work for most SQL databases is
> here
> > > > > > > > > >> > > > > > > > > > > >
> > > > > https://github.com/apache/kafka/pull/14231.
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > > The FLIP for the sister fix in
> Flink
> > > is
> > > > > here
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > > > -Artem
> > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to