Hi Jun, > 32. ... metric name ...
I've updated the metric name to be *kafka.coordinator.transaction:type=TransactionStateManager,name=ActiveTransactionOpenTimeMax.* Let me know if it works. -Artem On Thu, Feb 29, 2024 at 12:03 PM Artem Livshits <alivsh...@confluent.io> wrote: > Hi Jun, > > > So, it doesn't provide the same guarantees as 2PC either. > > I think the key point is that we don't claim 2PC guarantees in that case. > Maybe it's splitting hairs from the technical perspective (in the end of > the day if the operator doesn't let the user use 2PC, it's going to be a > "works until timeout" solution), but from user model perspective it > provides a clear structure: > > - if 2PC is possible then all guarantees are in place and there is no gray > area where we sort of provide guarantees but not fully > - if 2PC is not possible, then it's a well-informed constrain / decision > with well-known characteristics and the user can choose whether this is > acceptable or not for them > > Maybe we can look at it from a slightly different perspective: we are not > making a choice between allowing or not allowing using keepPrepareTxn=true > when 2PC=false (even though that's exactly how it looks from the KIP). In > fact, we're making a choice is whether Flink will be able to use an > official API when 2PC is not possible (and I think we've converged to agree > that sometimes it won't be) or keep using a reflection hack. In other > words, we already have a hacky implementation for the case of > keepPrepareTxn=true + 2PC=false, our choice is only whether we provide an > official API for that or not. > > In general, if someone goes and implements a reflection-based solution > that's an indication that there is a gap in public APIs. And we can debate > whether keepPreparedTxn=true + 2PC=false is the right API or not; and if we > think it's not, then we should provide an alternative. Right now the > alternative is to just keep using the reflection and I think it's always > worse than using a public API. > > -Artem > > On Wed, Feb 28, 2024 at 2:23 PM Jun Rao <j...@confluent.io.invalid> wrote: > >> Hi, Artem, >> >> Thanks for the reply. >> >> I understand your concern on having a timeout breaking the 2PC guarantees. >> However, the fallback plan to disable 2PC with an independent >> keepPreparedTxn is subject to the timeout too. So, it doesn't provide the >> same guarantees as 2PC either. >> >> To me, if we provide a new functionality, we should make it easy such that >> the application developer only needs to implement it in one way, which is >> always correct. Then, we can consider what additional things are needed to >> make the operator comfortable enabling it. >> >> Jun >> >> On Tue, Feb 27, 2024 at 4:45 PM Artem Livshits >> <alivsh...@confluent.io.invalid> wrote: >> >> > Hi Jun, >> > >> > Thank you for the discussion. >> > >> > > For 3b, it would be useful to understand the reason why an admin >> doesn't >> > authorize 2PC for self-hosted Flink >> > >> > I think the nuance here is that for cloud, there is a cloud admin >> > (operator) and there is cluster admin (who, for example could manage >> acls >> > on topics or etc.). The 2PC functionality can affect cloud operations, >> > because a long running transaction can block the last stable offset and >> > prevent compaction or data tiering. In a multi-tenant environment, a >> long >> > running transaction that involves consumer offset may affect data that >> is >> > shared by multiple tenants (Flink transactions don't use consumer >> offsets, >> > so this is not an issue for Flink, but we'd need a separate ACL or some >> > other way to express this permission if we wanted to go in that >> direction). >> > >> > For that reason, I expect 2PC to be controlled by the cloud operator >> and it >> > just may not be scalable for the cloud operator to manage all potential >> > interactions required to resolve in-doubt transactions (communicate to >> the >> > end users, etc.). In general, we make no assumptions about Kafka >> > applications -- they may come and go, they may abandon transactional ids >> > and generate new ones. For 2PC we need to make sure that the >> application >> > is highly available and wouldn't easily abandon an open transaction in >> > Kafka. >> > >> > > If so, another way to address that is to allow the admin to set a >> timeout >> > even for the 2PC case. >> > >> > This effectively abandons the 2PC guarantee because it creates a case >> for >> > Kafka to unilaterally make an automatic decision on a prepared >> > transaction. I think it's fundamental for 2PC to abandon this ability >> and >> > wait for the external coordinator for the decision, after all the >> > coordinator may legitimately be unavailable for an arbitrary amount of >> > time. Also, we already have a timeout on regular Kafka transactions, >> > having another "special" timeout could be confusing, and a large enough >> > timeout could still produce the undesirable effects for the cloud >> > operations (so we kind of get worst of both options -- we don't provide >> > guarantees and still have impact on operations). >> > >> > -Artem >> > >> > On Fri, Feb 23, 2024 at 8:55 AM Jun Rao <j...@confluent.io.invalid> >> wrote: >> > >> > > Hi, Artem, >> > > >> > > Thanks for the reply. >> > > >> > > For 3b, it would be useful to understand the reason why an admin >> doesn't >> > > authorize 2PC for self-hosted Flink. Is the main reason that 2PC has >> > > unbounded timeout that could lead to unbounded outstanding >> transactions? >> > If >> > > so, another way to address that is to allow the admin to set a timeout >> > even >> > > for the 2PC case. The timeout would be long enough for behavioring >> > > applications to complete 2PC operations, but not too long for >> > non-behaving >> > > applications' transactions to hang. >> > > >> > > Jun >> > > >> > > On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits >> > > <alivsh...@confluent.io.invalid> wrote: >> > > >> > > > Hi Jun, >> > > > >> > > > > 20A. One option is to make the API initTransactions(boolean >> > enable2PC). >> > > > >> > > > We could do that. I think there is a little bit of symmetry between >> > the >> > > > client and server that would get lost with this approach (server has >> > > > enable2PC as config), but I don't really see a strong reason for >> > > enable2PC >> > > > to be a config vs. an argument for initTransactions. But let's see >> if >> > we >> > > > find 20B to be a strong consideration for keeping a separate flag >> for >> > > > keepPreparedTxn. >> > > > >> > > > > 20B. But realistically, we want Flink (and other apps) to have a >> > single >> > > > implementation >> > > > >> > > > That's correct and here's what I think can happen if we don't allow >> > > > independent keepPreparedTxn: >> > > > >> > > > 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- >> reflection is >> > > > used, which effectively implements keepPreparedTxn=true without our >> > > > explicit support. >> > > > 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can >> > > > either fall back to reflection or we just say we don't support this, >> > have >> > > > to upgrade Kafka cluster first. >> > > > 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes >> > > > interesting depending on whether the Kafka cluster authorizes 2PC or >> > not: >> > > > 3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything >> > uses >> > > > KIP-939 and there is no problem >> > > > 3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we >> > can >> > > > either fallback to reflection or use keepPreparedTxn=true even if >> 2PC >> > is >> > > > not enabled. >> > > > >> > > > It seems to be ok to not support case 2 (i.e. require Kafka upgrade >> > > first), >> > > > it shouldn't be an issue for cloud offerings as cloud providers are >> > > likely >> > > > to upgrade their Kafka to the latest versions. >> > > > >> > > > The case 3b seems to be important to support, though -- the latest >> > > version >> > > > of everything should work at least as well (and preferably better) >> than >> > > > previous ones. It's possible to downgrade to case 1, but it's >> probably >> > > not >> > > > sustainable as newer versions of Flink would also add other features >> > that >> > > > the customers may want to take advantage of. >> > > > >> > > > If we enabled keepPreparedTxn=true even without 2PC, then we could >> > enable >> > > > case 3b without the need to fall back to reflection, so we could get >> > rid >> > > of >> > > > reflection-based logic and just have a single implementation based >> on >> > > > KIP-939. >> > > > >> > > > > 32. My suggestion is to change >> > > > >> > > > Let me think about it and I'll come back to this. >> > > > >> > > > -Artem >> > > > >> > > > On Wed, Feb 21, 2024 at 3:40 PM Jun Rao <j...@confluent.io.invalid> >> > > wrote: >> > > > >> > > > > Hi, Artem, >> > > > > >> > > > > Thanks for the reply. >> > > > > >> > > > > 20A. One option is to make the API initTransactions(boolean >> > enable2PC). >> > > > > Then, it's clear from the code whether 2PC related logic should be >> > > added. >> > > > > >> > > > > 20B. But realistically, we want Flink (and other apps) to have a >> > single >> > > > > implementation of the 2PC logic, not two different >> implementations, >> > > > right? >> > > > > >> > > > > 32. My suggestion is to >> > > > > change >> > > > > >> > > > >> > > >> > >> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max >> > > > > to sth like >> > > > > Metric Name Type Group >> > > > > Tags Description >> > > > > active-transaction-open-time-max Max >> > > transaction-coordinator-metrics >> > > > > none The max time a currently-open transaction has been open >> > > > > >> > > > > Jun >> > > > > >> > > > > On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits >> > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > >> > > > > > Hi Jun, >> > > > > > >> > > > > > > 20A. This only takes care of the abort case. The application >> > still >> > > > > needs >> > > > > > to be changed to handle the commit case properly >> > > > > > >> > > > > > My point here is that looking at the initTransactions() call >> it's >> > not >> > > > > clear >> > > > > > what the semantics is. Say I'm doing code review, I cannot say >> if >> > > the >> > > > > code >> > > > > > is correct or not -- if the config (that's something that's >> > > > > > theoretically not known at the time of code review) is going to >> > > enable >> > > > > 2PC, >> > > > > > then the correct code should look one way, otherwise it would >> need >> > to >> > > > > look >> > > > > > differently. Also, say if code is written with >> InitTransaction() >> > > > without >> > > > > > explicit abort and then for whatever reason the code would get >> used >> > > > with >> > > > > > 2PC enabled (could be a library in a bigger product) it'll start >> > > > breaking >> > > > > > in a non-intuitive way. >> > > > > > >> > > > > > > 20B. Hmm, if the admin disables 2PC, there is likely a reason >> > > behind >> > > > > that >> > > > > > >> > > > > > That's true, but reality may be more complicated. Say a user >> wants >> > > to >> > > > > run >> > > > > > a self-managed Flink with Confluent cloud. Confluent cloud adim >> > may >> > > > not >> > > > > > be comfortable enabling 2PC to general user accounts that use >> > > services >> > > > > not >> > > > > > managed by Confluent (the same way Confluent doesn't allow >> > increasing >> > > > max >> > > > > > transaction timeout for general user accounts). Right now, >> > > > self-managed >> > > > > > Flink works because it uses reflection, if it moves to use >> public >> > > APIs >> > > > > > provided by KIP-939 it'll break. >> > > > > > >> > > > > > > 32. Ok. That's the kafka metric. In that case, the metric name >> > has >> > > a >> > > > > > group and a name. There is no type and no package name. >> > > > > > >> > > > > > Is this a suggestion to change or confirmation that the current >> > logic >> > > > is >> > > > > > ok? I just copied an existing metric but can change if needed. >> > > > > > >> > > > > > -Artem >> > > > > > >> > > > > > On Tue, Feb 20, 2024 at 11:25 AM Jun Rao >> <j...@confluent.io.invalid >> > > >> > > > > wrote: >> > > > > > >> > > > > > > Hi, Artem, >> > > > > > > >> > > > > > > Thanks for the reply. >> > > > > > > >> > > > > > > 20. "Say if an application >> > > > > > > currently uses initTransactions() to achieve the current >> > semantics, >> > > > it >> > > > > > > would need to be rewritten to use initTransactions() + abort >> to >> > > > achieve >> > > > > > the >> > > > > > > same semantics if the config is changed. " >> > > > > > > >> > > > > > > This only takes care of the abort case. The application still >> > needs >> > > > to >> > > > > be >> > > > > > > changed to handle the commit case properly >> > > > > > > if transaction.two.phase.commit.enable is set to true. >> > > > > > > >> > > > > > > "Even when KIP-939 is implemented, >> > > > > > > there would be situations when 2PC is disabled by the admin >> (e.g. >> > > > Kafka >> > > > > > > service providers may be reluctant to enable 2PC for Flink >> > services >> > > > > that >> > > > > > > users host themselves), so we either have to perpetuate the >> > > > > > > reflection-based implementation in Flink or enable >> > > > keepPreparedTxn=true >> > > > > > > without 2PC." >> > > > > > > >> > > > > > > Hmm, if the admin disables 2PC, there is likely a reason >> behind >> > > > that. I >> > > > > > am >> > > > > > > not sure that we should provide an API to encourage the >> > application >> > > > to >> > > > > > > circumvent that. >> > > > > > > >> > > > > > > 32. Ok. That's the kafka metric. In that case, the metric name >> > has >> > > a >> > > > > > group >> > > > > > > and a name. There is no type and no package name. >> > > > > > > >> > > > > > > Jun >> > > > > > > >> > > > > > > >> > > > > > > On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits >> > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > >> > > > > > > > Hi Jun, >> > > > > > > > >> > > > > > > > Thank you for your questions. >> > > > > > > > >> > > > > > > > > 20. So to abort a prepared transaction after the producer >> > > start, >> > > > we >> > > > > > > could >> > > > > > > > use ... >> > > > > > > > >> > > > > > > > I agree, initTransaction(true) + abort would accomplish the >> > > > behavior >> > > > > of >> > > > > > > > initTransactions(false), so we could technically have fewer >> > ways >> > > to >> > > > > > > achieve >> > > > > > > > the same thing, which is generally valuable. I wonder, >> though, >> > > if >> > > > > that >> > > > > > > > would be intuitive from the application perspective. Say >> if an >> > > > > > > application >> > > > > > > > currently uses initTransactions() to achieve the current >> > > semantics, >> > > > > it >> > > > > > > > would need to be rewritten to use initTransactions() + >> abort to >> > > > > achieve >> > > > > > > the >> > > > > > > > same semantics if the config is changed. I think this could >> > > create >> > > > > > > > subtle confusion, as the config change is generally >> decoupled >> > > from >> > > > > > > changing >> > > > > > > > application implementation. >> > > > > > > > >> > > > > > > > > The use case mentioned for keepPreparedTxn=true without >> 2PC >> > > > > doesn't >> > > > > > > seem >> > > > > > > > very important >> > > > > > > > >> > > > > > > > I agree, it's not a strict requirement. It is, however, a >> > > missing >> > > > > > option >> > > > > > > > in the public API, so currently Flink has to use reflection >> to >> > > > > emulate >> > > > > > > this >> > > > > > > > functionality without 2PC support. Even when KIP-939 is >> > > > > implemented, >> > > > > > > > there would be situations when 2PC is disabled by the admin >> > (e.g. >> > > > > Kafka >> > > > > > > > service providers may be reluctant to enable 2PC for Flink >> > > services >> > > > > > that >> > > > > > > > users host themselves), so we either have to perpetuate the >> > > > > > > > reflection-based implementation in Flink or enable >> > > > > keepPreparedTxn=true >> > > > > > > > without 2PC. >> > > > > > > > >> > > > > > > > > 32. >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max >> > > > > > > > >> > > > > > > > I just followed the existing metric implementation example >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95 >> > > > > > > > , >> > > > > > > > which maps to >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max. >> > > > > > > > >> > > > > > > > > 33. "If the value is 'true' then the corresponding field >> is >> > set >> > > > > > > > >> > > > > > > > That's correct. Updated the KIP. >> > > > > > > > >> > > > > > > > -Artem >> > > > > > > > >> > > > > > > > On Wed, Feb 7, 2024 at 10:06 AM Jun Rao >> > <j...@confluent.io.invalid >> > > > >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi, Artem, >> > > > > > > > > >> > > > > > > > > Thanks for the reply. >> > > > > > > > > >> > > > > > > > > 20. So to abort a prepared transaction after producer >> start, >> > we >> > > > > could >> > > > > > > use >> > > > > > > > > either >> > > > > > > > > producer.initTransactions(false) >> > > > > > > > > or >> > > > > > > > > producer.initTransactions(true) >> > > > > > > > > producer.abortTransaction >> > > > > > > > > Could we just always use the latter API? If we do this, we >> > > could >> > > > > > > > > potentially eliminate the keepPreparedTxn flag in >> > > > > initTransactions(). >> > > > > > > > After >> > > > > > > > > the initTransactions() call, the outstanding txn is always >> > > > > preserved >> > > > > > if >> > > > > > > > 2pc >> > > > > > > > > is enabled and aborted if 2pc is disabled. The use case >> > > mentioned >> > > > > for >> > > > > > > > > keepPreparedTxn=true without 2PC doesn't seem very >> important. >> > > If >> > > > we >> > > > > > > could >> > > > > > > > > do that, it seems that we have (1) less redundant and >> simpler >> > > > APIs; >> > > > > > (2) >> > > > > > > > > more symmetric syntax for aborting/committing a prepared >> txn >> > > > after >> > > > > > > > producer >> > > > > > > > > restart. >> > > > > > > > > >> > > > > > > > > 32. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max >> > > > > > > > > Is this a Yammer or kafka metric? The former uses the >> camel >> > > case >> > > > > for >> > > > > > > name >> > > > > > > > > and type. The latter uses the hyphen notation, but doesn't >> > have >> > > > the >> > > > > > > type >> > > > > > > > > attribute. >> > > > > > > > > >> > > > > > > > > 33. "If the value is 'true' then the corresponding field >> is >> > set >> > > > in >> > > > > > the >> > > > > > > > > InitProducerIdRequest and the KafkaProducer object is set >> > into >> > > a >> > > > > > state >> > > > > > > > > which only allows calling .commitTransaction or >> > > > .abortTransaction." >> > > > > > > > > We should also allow .completeTransaction, right? >> > > > > > > > > >> > > > > > > > > Jun >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits >> > > > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > > > >> > > > > > > > > > Hi Jun, >> > > > > > > > > > >> > > > > > > > > > > 20. For Flink usage, it seems that the APIs used to >> abort >> > > and >> > > > > > > commit >> > > > > > > > a >> > > > > > > > > > prepared txn are not symmetric. >> > > > > > > > > > >> > > > > > > > > > For Flink it is expected that Flink would call >> > > > .commitTransaction >> > > > > > or >> > > > > > > > > > .abortTransaction directly, it wouldn't need to deal >> with >> > > > > > > > > PreparedTxnState, >> > > > > > > > > > the outcome is actually determined by the Flink's job >> > > manager, >> > > > > not >> > > > > > by >> > > > > > > > > > comparison of PreparedTxnState. So for Flink, if the >> Kafka >> > > > sync >> > > > > > > > crashes >> > > > > > > > > > and restarts there are 2 cases: >> > > > > > > > > > >> > > > > > > > > > 1. Transaction is not prepared. In that case just call >> > > > > > > > > > producer.initTransactions(false) and then can start >> > > > transactions >> > > > > as >> > > > > > > > > needed. >> > > > > > > > > > 2. Transaction is prepared. In that case call >> > > > > > > > > > producer.initTransactions(true) and wait for the >> decision >> > > from >> > > > > the >> > > > > > > job >> > > > > > > > > > manager. Note that it's not given that the transaction >> > will >> > > > get >> > > > > > > > > committed, >> > > > > > > > > > the decision could also be an abort. >> > > > > > > > > > >> > > > > > > > > > > 21. transaction.max.timeout.ms could in theory be >> > > MAX_INT. >> > > > > > > Perhaps >> > > > > > > > we >> > > > > > > > > > could use a negative timeout in the record to indicate >> 2PC? >> > > > > > > > > > >> > > > > > > > > > -1 sounds good, updated. >> > > > > > > > > > >> > > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing >> > txn. >> > > > Do >> > > > > we >> > > > > > > > need >> > > > > > > > > > both? >> > > > > > > > > > >> > > > > > > > > > I think of producer.initTransactions() to be an >> > > implementation >> > > > > for >> > > > > > > > > > adminClient.forceTerminateTransaction(transactionalId). >> > > > > > > > > > >> > > > > > > > > > > 31. "This would flush all the pending messages and >> > > transition >> > > > > the >> > > > > > > > > > producer >> > > > > > > > > > >> > > > > > > > > > Updated the KIP to clarify that IllegalStateException >> will >> > be >> > > > > > thrown. >> > > > > > > > > > >> > > > > > > > > > -Artem >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao >> > > > <j...@confluent.io.invalid >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi, Artem, >> > > > > > > > > > > >> > > > > > > > > > > Thanks for the reply. >> > > > > > > > > > > >> > > > > > > > > > > 20. For Flink usage, it seems that the APIs used to >> abort >> > > and >> > > > > > > commit >> > > > > > > > a >> > > > > > > > > > > prepared txn are not symmetric. >> > > > > > > > > > > To abort, the app will just call >> > > > > > > > > > > producer.initTransactions(false) >> > > > > > > > > > > >> > > > > > > > > > > To commit, the app needs to call >> > > > > > > > > > > producer.initTransactions(true) >> > > > > > > > > > > producer.completeTransaction(preparedTxnState) >> > > > > > > > > > > >> > > > > > > > > > > Will this be a concern? For the dual-writer usage, >> both >> > > > > > > abort/commit >> > > > > > > > > use >> > > > > > > > > > > the same API. >> > > > > > > > > > > >> > > > > > > > > > > 21. transaction.max.timeout.ms could in theory be >> > MAX_INT. >> > > > > > Perhaps >> > > > > > > > we >> > > > > > > > > > > could >> > > > > > > > > > > use a negative timeout in the record to indicate 2PC? >> > > > > > > > > > > >> > > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing >> > txn. >> > > > Do >> > > > > we >> > > > > > > > need >> > > > > > > > > > > both? >> > > > > > > > > > > producer.initTransactions(false) >> > > > > > > > > > > >> adminClient.forceTerminateTransaction(transactionalId) >> > > > > > > > > > > >> > > > > > > > > > > 31. "This would flush all the pending messages and >> > > transition >> > > > > the >> > > > > > > > > > producer >> > > > > > > > > > > into a mode where only .commitTransaction, >> > > .abortTransaction, >> > > > > or >> > > > > > > > > > > .completeTransaction could be called. If the call is >> > > > > successful >> > > > > > > (all >> > > > > > > > > > > messages successfully got flushed to all partitions) >> the >> > > > > > > transaction >> > > > > > > > is >> > > > > > > > > > > prepared." >> > > > > > > > > > > If the producer calls send() in that state, what >> > exception >> > > > > will >> > > > > > > the >> > > > > > > > > > caller >> > > > > > > > > > > receive? >> > > > > > > > > > > >> > > > > > > > > > > Jun >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits >> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi Jun, >> > > > > > > > > > > > >> > > > > > > > > > > > > Then, should we change the following in the >> example >> > to >> > > > use >> > > > > > > > > > > > InitProducerId(true) instead? >> > > > > > > > > > > > >> > > > > > > > > > > > We could. I just thought that it's good to make the >> > > example >> > > > > > > > > > > self-contained >> > > > > > > > > > > > by starting from a clean state. >> > > > > > > > > > > > >> > > > > > > > > > > > > Also, could Flink just follow the dual-write >> recipe? >> > > > > > > > > > > > >> > > > > > > > > > > > I think it would bring some unnecessary logic to >> Flink >> > > (or >> > > > > any >> > > > > > > > other >> > > > > > > > > > > system >> > > > > > > > > > > > that already has a transaction coordinator and just >> > wants >> > > > to >> > > > > > > drive >> > > > > > > > > > Kafka >> > > > > > > > > > > to >> > > > > > > > > > > > the desired state). We could discuss it with Flink >> > > folks, >> > > > > the >> > > > > > > > > current >> > > > > > > > > > > > proposal was developed in collaboration with them. >> > > > > > > > > > > > >> > > > > > > > > > > > > 21. Could a non 2pc user explicitly set the >> > > > > > > TransactionTimeoutMs >> > > > > > > > to >> > > > > > > > > > > > Integer.MAX_VALUE? >> > > > > > > > > > > > >> > > > > > > > > > > > The server would reject this for regular >> transactions, >> > it >> > > > > only >> > > > > > > > > accepts >> > > > > > > > > > > > values that are <= *transaction.max.timeout.ms >> > > > > > > > > > > > <http://transaction.max.timeout.ms> *(a broker >> > config). >> > > > > > > > > > > > >> > > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator >> > > expects >> > > > > the >> > > > > > > > > endTxn >> > > > > > > > > > > > request to use the ongoing pid. ... >> > > > > > > > > > > > >> > > > > > > > > > > > Without 2PC there is no case where the pid could >> change >> > > > > between >> > > > > > > > > > starting >> > > > > > > > > > > a >> > > > > > > > > > > > transaction and endTxn (InitProducerId would abort >> any >> > > > > ongoing >> > > > > > > > > > > > transaction). WIth 2PC there is now a case where >> there >> > > > could >> > > > > > be >> > > > > > > > > > > > InitProducerId that can change the pid without >> aborting >> > > the >> > > > > > > > > > transaction, >> > > > > > > > > > > so >> > > > > > > > > > > > we need to handle that. I wouldn't say that the >> flow >> > is >> > > > > > > different, >> > > > > > > > > but >> > > > > > > > > > > > it's rather extended to handle new cases. The main >> > > > principle >> > > > > > is >> > > > > > > > > still >> > > > > > > > > > > the >> > > > > > > > > > > > same -- for all operations we use the latest >> > > "operational" >> > > > > pid >> > > > > > > and >> > > > > > > > > > epoch >> > > > > > > > > > > > known to the client, this way we guarantee that we >> can >> > > > fence >> > > > > > > > zombie / >> > > > > > > > > > > split >> > > > > > > > > > > > brain clients by disrupting the "latest known" pid + >> > > epoch >> > > > > > > > > progression. >> > > > > > > > > > > > >> > > > > > > > > > > > > 25. "We send out markers using the original >> ongoing >> > > > > > transaction >> > > > > > > > > > > > ProducerId and ProducerEpoch" ... >> > > > > > > > > > > > >> > > > > > > > > > > > Updated. >> > > > > > > > > > > > >> > > > > > > > > > > > -Artem >> > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao >> > > > > > <j...@confluent.io.invalid >> > > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hi, Artem, >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks for the reply. >> > > > > > > > > > > > > >> > > > > > > > > > > > > 20. So for the dual-write recipe, we should always >> > call >> > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true) from the >> > producer? >> > > > > Then, >> > > > > > > > > should >> > > > > > > > > > we >> > > > > > > > > > > > > change the following in the example to use >> > > > > > InitProducerId(true) >> > > > > > > > > > > instead? >> > > > > > > > > > > > > 1. InitProducerId(false); TC STATE: Empty, >> > > ProducerId=42, >> > > > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, >> > > > NextProducerId=-1, >> > > > > > > > > > > > > NextProducerEpoch=-1; RESPONSE ProducerId=42, >> > > > Epoch=MAX-1, >> > > > > > > > > > > > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1. >> > > > > > > > > > > > > Also, could Flink just follow the dual-write >> recipe? >> > > It's >> > > > > > > simpler >> > > > > > > > > if >> > > > > > > > > > > > there >> > > > > > > > > > > > > is one way to solve the 2pc issue. >> > > > > > > > > > > > > >> > > > > > > > > > > > > 21. Could a non 2pc user explicitly set the >> > > > > > > TransactionTimeoutMs >> > > > > > > > to >> > > > > > > > > > > > > Integer.MAX_VALUE? >> > > > > > > > > > > > > >> > > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator >> > > expects >> > > > > the >> > > > > > > > > endTxn >> > > > > > > > > > > > > request to use the ongoing pid. With 2pc, the >> > > coordinator >> > > > > now >> > > > > > > > > expects >> > > > > > > > > > > the >> > > > > > > > > > > > > endTxn request to use the next pid. So, the flow >> is >> > > > > > different, >> > > > > > > > > right? >> > > > > > > > > > > > > >> > > > > > > > > > > > > 25. "We send out markers using the original >> ongoing >> > > > > > transaction >> > > > > > > > > > > > ProducerId >> > > > > > > > > > > > > and ProducerEpoch" >> > > > > > > > > > > > > We should use ProducerEpoch + 1 in the marker, >> right? >> > > > > > > > > > > > > >> > > > > > > > > > > > > Jun >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits >> > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi Jun, >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 20. I am a bit confused by how we set >> > > > keepPreparedTxn. >> > > > > > > ... >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > keepPreparedTxn=true informs the transaction >> > > > coordinator >> > > > > > that >> > > > > > > > it >> > > > > > > > > > > should >> > > > > > > > > > > > > > keep the ongoing transaction, if any. If the >> > > > > > > > > > keepPreparedTxn=false, >> > > > > > > > > > > > then >> > > > > > > > > > > > > > any ongoing transaction is aborted (this is >> exactly >> > > the >> > > > > > > current >> > > > > > > > > > > > > behavior). >> > > > > > > > > > > > > > enable2Pc is a separate argument that is >> controlled >> > > by >> > > > > the >> > > > > > > > > > > > > > *transaction.two.phase.commit.enable *setting on >> > the >> > > > > > client. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > To start 2PC, the client just needs to set >> > > > > > > > > > > > > > *transaction.two.phase.commit.enable*=true in >> the >> > > > config. >> > > > > > > Then >> > > > > > > > > if >> > > > > > > > > > > the >> > > > > > > > > > > > > > client knows the status of the transaction >> upfront >> > > (in >> > > > > the >> > > > > > > case >> > > > > > > > > of >> > > > > > > > > > > > Flink, >> > > > > > > > > > > > > > Flink keeps the knowledge if the transaction is >> > > > prepared >> > > > > in >> > > > > > > its >> > > > > > > > > own >> > > > > > > > > > > > > store, >> > > > > > > > > > > > > > so it always knows upfront), it can set >> > > keepPreparedTxn >> > > > > > > > > > accordingly, >> > > > > > > > > > > > then >> > > > > > > > > > > > > > if the transaction was prepared, it'll be ready >> for >> > > the >> > > > > > > client >> > > > > > > > to >> > > > > > > > > > > > > complete >> > > > > > > > > > > > > > the appropriate action; if the client doesn't >> have >> > a >> > > > > > > knowledge >> > > > > > > > > that >> > > > > > > > > > > the >> > > > > > > > > > > > > > transaction is prepared, keepPreparedTxn is >> going >> > to >> > > be >> > > > > > > false, >> > > > > > > > in >> > > > > > > > > > > which >> > > > > > > > > > > > > > case we'll get to a clean state (the same way >> we do >> > > > > today). >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > For the dual-write recipe, the client doesn't >> know >> > > > > upfront >> > > > > > if >> > > > > > > > the >> > > > > > > > > > > > > > transaction is prepared, this information is >> > > implicitly >> > > > > > > encoded >> > > > > > > > > > > > > > PreparedTxnState value that can be used to >> resolve >> > > the >> > > > > > > > > transaction >> > > > > > > > > > > > state. >> > > > > > > > > > > > > > In that case, keepPreparedTxn should always be >> > true, >> > > > > > because >> > > > > > > we >> > > > > > > > > > don't >> > > > > > > > > > > > > know >> > > > > > > > > > > > > > upfront and we don't want to accidentally abort >> a >> > > > > committed >> > > > > > > > > > > > transaction. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The forceTerminateTransaction call can just use >> > > > > > > > > > > keepPreparedTxn=false, >> > > > > > > > > > > > it >> > > > > > > > > > > > > > actually doesn't matter if it sets Enable2Pc >> flag. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some >> field to >> > > > > > identify >> > > > > > > > > > whether >> > > > > > > > > > > > this >> > > > > > > > > > > > > > is written for 2PC so that ongoing txn is never >> > auto >> > > > > > aborted? >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The TransactionTimeoutMs would be set to >> > > > > Integer.MAX_VALUE >> > > > > > if >> > > > > > > > 2PC >> > > > > > > > > > was >> > > > > > > > > > > > > > enabled. I've added a note to the KIP about >> this. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 22 >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > You're right it's a typo. I fixed it as well as >> > > step 9 >> > > > > > > > (REQUEST: >> > > > > > > > > > > > > > ProducerId=73, ProducerEpoch=MAX). >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven >> by >> > a >> > > > > config >> > > > > > > > while >> > > > > > > > > > > > > > KeepPreparedTxn is from an API param ... >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The intent to use 2PC doesn't change from >> > transaction >> > > > to >> > > > > > > > > > transaction, >> > > > > > > > > > > > but >> > > > > > > > > > > > > > the intent to keep prepared txn may change from >> > > > > transaction >> > > > > > > to >> > > > > > > > > > > > > > transaction. In dual-write recipes the >> distinction >> > > is >> > > > > not >> > > > > > > > clear, >> > > > > > > > > > but >> > > > > > > > > > > > for >> > > > > > > > > > > > > > use cases where keepPreparedTxn value is known >> > > upfront >> > > > > > (e.g. >> > > > > > > > > Flink) >> > > > > > > > > > > > it's >> > > > > > > > > > > > > > more prominent. E.g. a Flink's Kafka sink >> operator >> > > > could >> > > > > > be >> > > > > > > > > > deployed >> > > > > > > > > > > > > with >> > > > > > > > > > > > > > *transaction.two.phase.commit.enable*=true >> > hardcoded >> > > in >> > > > > the >> > > > > > > > > image, >> > > > > > > > > > > but >> > > > > > > > > > > > > > keepPreparedTxn cannot be hardcoded in the >> image, >> > > > because >> > > > > > it >> > > > > > > > > > depends >> > > > > > > > > > > on >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > job manager's state. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 24 >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The flow is actually going to be the same way >> as it >> > > is >> > > > > now >> > > > > > -- >> > > > > > > > the >> > > > > > > > > > > > "main" >> > > > > > > > > > > > > > producer id + epoch needs to be used in all >> > > operations >> > > > to >> > > > > > > > prevent >> > > > > > > > > > > > fencing >> > > > > > > > > > > > > > (it's sort of a common "header" in all RPC calls >> > that >> > > > > > follow >> > > > > > > > the >> > > > > > > > > > same >> > > > > > > > > > > > > > rules). The ongoing txn info is just additional >> > info >> > > > for >> > > > > > > > making >> > > > > > > > > a >> > > > > > > > > > > > > commit / >> > > > > > > > > > > > > > abort decision based on the PreparedTxnState >> from >> > the >> > > > DB. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > --Artem >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao >> > > > > > > > > <j...@confluent.io.invalid >> > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hi, Artem, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Thanks for the reply. A few more comments. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 20. I am a bit confused by how we set >> > > > keepPreparedTxn. >> > > > > > From >> > > > > > > > the >> > > > > > > > > > > KIP, >> > > > > > > > > > > > I >> > > > > > > > > > > > > > got >> > > > > > > > > > > > > > > the following (1) to start 2pc, we call >> > > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false); (2) >> when >> > the >> > > > > > > producer >> > > > > > > > > > fails >> > > > > > > > > > > > and >> > > > > > > > > > > > > > > needs to do recovery, it calls >> > > > > > > > > > > InitProducerId(keepPreparedTxn=true); >> > > > > > > > > > > > > (3) >> > > > > > > > > > > > > > > Admin.forceTerminateTransaction() calls >> > > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false). >> > > > > > > > > > > > > > > 20.1 In (1), when a producer calls >> > > > > InitProducerId(false) >> > > > > > > with >> > > > > > > > > 2pc >> > > > > > > > > > > > > > enabled, >> > > > > > > > > > > > > > > and there is an ongoing txn, should the server >> > > return >> > > > > an >> > > > > > > > error >> > > > > > > > > to >> > > > > > > > > > > the >> > > > > > > > > > > > > > > InitProducerId request? If so, what would be >> the >> > > > error >> > > > > > > code? >> > > > > > > > > > > > > > > 20.2 How do we distinguish between (1) and >> (3)? >> > > It's >> > > > > the >> > > > > > > same >> > > > > > > > > API >> > > > > > > > > > > > call >> > > > > > > > > > > > > > but >> > > > > > > > > > > > > > > (1) doesn't abort ongoing txn and (2) does. >> > > > > > > > > > > > > > > 20.3 The usage in (1) seems unintuitive. 2pc >> > > implies >> > > > > > > keeping >> > > > > > > > > the >> > > > > > > > > > > > > ongoing >> > > > > > > > > > > > > > > txn. So, setting keepPreparedTxn to false to >> > start >> > > > 2pc >> > > > > > > seems >> > > > > > > > > > > counter >> > > > > > > > > > > > > > > intuitive. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some >> field to >> > > > > > identify >> > > > > > > > > > whether >> > > > > > > > > > > > this >> > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > written for 2PC so that ongoing txn is never >> auto >> > > > > > aborted? >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 22. "8. InitProducerId(true); TC STATE: >> Ongoing, >> > > > > > > > ProducerId=42, >> > > > > > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, >> > > > > > NextProducerId=73, >> > > > > > > > > > > > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73, >> > > > > > Epoch=MAX-1, >> > > > > > > > > > > > > > > OngoingTxnProducerId=42, >> OngoingTxnEpoch=MAX-1" >> > > > > > > > > > > > > > > It seems in the above example, Epoch in >> RESPONSE >> > > > should >> > > > > > be >> > > > > > > > MAX >> > > > > > > > > to >> > > > > > > > > > > > match >> > > > > > > > > > > > > > > NextProducerEpoch? >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven >> by >> > a >> > > > > config >> > > > > > > > > > > > > > > while KeepPreparedTxn is from an API param. >> > Should >> > > we >> > > > > > make >> > > > > > > > them >> > > > > > > > > > > more >> > > > > > > > > > > > > > > consistent since they seem related? >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 24. "9. Commit; REQUEST: ProducerId=73, >> > > > > > > ProducerEpoch=MAX-1; >> > > > > > > > TC >> > > > > > > > > > > > STATE: >> > > > > > > > > > > > > > > PrepareCommit, ProducerId=42, >> ProducerEpoch=MAX, >> > > > > > > > > > PrevProducerId=73, >> > > > > > > > > > > > > > > NextProducerId=85, NextProducerEpoch=0; >> RESPONSE >> > > > > > > > ProducerId=85, >> > > > > > > > > > > > > Epoch=0, >> > > > > > > > > > > > > > > When a commit request is sent, it uses the >> latest >> > > > > > > ProducerId >> > > > > > > > > and >> > > > > > > > > > > > > > > ProducerEpoch." >> > > > > > > > > > > > > > > The step where we use the next produceId to >> > commit >> > > an >> > > > > old >> > > > > > > txn >> > > > > > > > > > > works, >> > > > > > > > > > > > > but >> > > > > > > > > > > > > > > can be confusing. It's going to be hard for >> > people >> > > > > > > > implementing >> > > > > > > > > > > this >> > > > > > > > > > > > > new >> > > > > > > > > > > > > > > client protocol to figure out when to use the >> > > current >> > > > > or >> > > > > > > the >> > > > > > > > > new >> > > > > > > > > > > > > > producerId >> > > > > > > > > > > > > > > in the EndTxnRequest. One potential way to >> > improve >> > > > this >> > > > > > is >> > > > > > > to >> > > > > > > > > > > extend >> > > > > > > > > > > > > > > EndTxnRequest with a new field like >> > > > > > expectedNextProducerId. >> > > > > > > > > Then >> > > > > > > > > > we >> > > > > > > > > > > > can >> > > > > > > > > > > > > > > always use the old produceId in the existing >> > field, >> > > > but >> > > > > > set >> > > > > > > > > > > > > > > expectedNextProducerId to bypass the fencing >> > logic >> > > > when >> > > > > > > > needed. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Jun >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits >> > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Hi Jun, >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thank you for the comments. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc >> and >> > > > > > > > KeepPreparedTxn >> > > > > > > > > > ... >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > I added a note that all combinations are >> valid. >> > > > > > > > > > Enable2Pc=false >> > > > > > > > > > > & >> > > > > > > > > > > > > > > > KeepPreparedTxn=true could be potentially >> > useful >> > > > for >> > > > > > > > backward >> > > > > > > > > > > > > > > compatibility >> > > > > > > > > > > > > > > > with Flink, when the new version of Flink >> that >> > > > > > implements >> > > > > > > > > > KIP-319 >> > > > > > > > > > > > > tries >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > work with a cluster that doesn't authorize >> 2PC. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 11. InitProducerIdResponse: If there is >> no >> > > > ongoing >> > > > > > > txn, >> > > > > > > > > what >> > > > > > > > > > > > will >> > > > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be >> > set? >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > I added a note that they will be set to -1. >> > The >> > > > > client >> > > > > > > > then >> > > > > > > > > > will >> > > > > > > > > > > > > know >> > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > there is no ongoing txn and >> > .completeTransaction >> > > > > > becomes >> > > > > > > a >> > > > > > > > > > no-op >> > > > > > > > > > > > (but >> > > > > > > > > > > > > > > still >> > > > > > > > > > > > > > > > required before .send is enabled). >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 12. ListTransactionsRequest related >> changes: >> > It >> > > > > seems >> > > > > > > > those >> > > > > > > > > > are >> > > > > > > > > > > > > > already >> > > > > > > > > > > > > > > > covered by KIP-994? >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Removed from this KIP. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 13. TransactionalLogValue ... >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > This is now updated to work on top of >> KIP-890. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch) >> pair >> > > that >> > > > > > > > > corresponds >> > > > > > > > > > to >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > ongoing transaction ... >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > This is now updated to work on top of >> KIP-890. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 15. active-transaction-total-time-max : >> ... >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Updated. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable >> The >> > > > > default >> > > > > > > > would >> > > > > > > > > be >> > > > > > > > > > > > > > ‘false’. >> > > > > > > > > > > > > > > > If it’s ‘false’, 2PC functionality is >> disabled >> > > even >> > > > > if >> > > > > > > the >> > > > > > > > > ACL >> > > > > > > > > > is >> > > > > > > > > > > > set >> > > > > > > > > > > > > > ... >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Disabling 2PC effectively removes all >> > > authorization >> > > > > to >> > > > > > > use >> > > > > > > > > it, >> > > > > > > > > > > > hence >> > > > > > > > > > > > > I >> > > > > > > > > > > > > > > > thought >> TRANSACTIONAL_ID_AUTHORIZATION_FAILED >> > > would >> > > > > be >> > > > > > > > > > > appropriate. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Do you suggest using a different error code >> for >> > > 2PC >> > > > > > > > > > authorization >> > > > > > > > > > > > vs >> > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > other authorization (e.g. >> > > > > > > > > > > > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED) >> > > > > > > > > > > > > > or a >> > > > > > > > > > > > > > > > different code for disabled vs. unauthorised >> > > (e.g. >> > > > > > > > > > > > > > > > TWO_PHASE_COMMIT_DISABLED) or both? >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this >> to >> > be >> > > > > only >> > > > > > > used >> > > > > > > > > > > during >> > > > > > > > > > > > > > > > recovery. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > It can also be used if, say, a commit to the >> > > > database >> > > > > > > fails >> > > > > > > > > and >> > > > > > > > > > > the >> > > > > > > > > > > > > > > result >> > > > > > > > > > > > > > > > is inconclusive, e.g. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > 1. Begin DB transaction >> > > > > > > > > > > > > > > > 2. Begin Kafka transaction >> > > > > > > > > > > > > > > > 3. Prepare Kafka transaction >> > > > > > > > > > > > > > > > 4. Commit DB transaction >> > > > > > > > > > > > > > > > 5. The DB commit fails, figure out the >> state of >> > > the >> > > > > > > > > transaction >> > > > > > > > > > > by >> > > > > > > > > > > > > > > reading >> > > > > > > > > > > > > > > > the PreparedTxnState from DB >> > > > > > > > > > > > > > > > 6. Complete Kafka transaction with the >> > > > > > PreparedTxnState. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 18. "either prepareTransaction was called >> or >> > > > > > > > > > > > initTransaction(true) >> > > > > > > > > > > > > > was >> > > > > > > > > > > > > > > > called": "either" should be "neither"? >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Updated. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up >> the >> > > > epoch, >> > > > > > it >> > > > > > > > > > creates >> > > > > > > > > > > a >> > > > > > > > > > > > > > > > situation ... >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > InitProducerId only bumps the producer >> epoch, >> > the >> > > > > > ongoing >> > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > stays the same, no matter how many times the >> > > > > > > InitProducerId >> > > > > > > > > is >> > > > > > > > > > > > called >> > > > > > > > > > > > > > > > before the transaction is completed. >> > Eventually >> > > > the >> > > > > > > epoch >> > > > > > > > > may >> > > > > > > > > > > > > > overflow, >> > > > > > > > > > > > > > > > and then a new producer id would be >> allocated, >> > > but >> > > > > the >> > > > > > > > > ongoing >> > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > producer id would stay the same. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > I've added a couple examples in the KIP ( >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges >> > > > > > > > > > > > > > > > ) >> > > > > > > > > > > > > > > > that walk through some scenarios and show >> how >> > the >> > > > > state >> > > > > > > is >> > > > > > > > > > > changed. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > -Artem >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao >> > > > > > > > > > <j...@confluent.io.invalid >> > > > > > > > > > > > >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Hi, Artem, >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Thanks for the KIP. A few comments below. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc >> and >> > > > > > > > KeepPreparedTxn >> > > > > > > > > > in >> > > > > > > > > > > > > > > > > InitProducerId, it would be useful to >> > document >> > > a >> > > > > bit >> > > > > > > more >> > > > > > > > > > > detail >> > > > > > > > > > > > on >> > > > > > > > > > > > > > > what >> > > > > > > > > > > > > > > > > values are set under what cases. For >> example, >> > > are >> > > > > all >> > > > > > > > four >> > > > > > > > > > > > > > combinations >> > > > > > > > > > > > > > > > > valid? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 11. InitProducerIdResponse: If there is >> no >> > > > ongoing >> > > > > > > txn, >> > > > > > > > > what >> > > > > > > > > > > > will >> > > > > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch >> be >> > > set? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 12. ListTransactionsRequest related >> changes: >> > It >> > > > > seems >> > > > > > > > those >> > > > > > > > > > are >> > > > > > > > > > > > > > already >> > > > > > > > > > > > > > > > > covered by KIP-994? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 13. TransactionalLogValue: Could we name >> > > > > > > > > > TransactionProducerId >> > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > ProducerId better? It's not clear from the >> > name >> > > > > which >> > > > > > > is >> > > > > > > > > for >> > > > > > > > > > > > which. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch) >> pair >> > > that >> > > > > > > > > corresponds >> > > > > > > > > > to >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > ongoing >> > > > > > > > > > > > > > > > > transaction is going to be written >> instead of >> > > the >> > > > > > > > existing >> > > > > > > > > > > > > ProducerId >> > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > ProducerEpoch fields (which are renamed to >> > > > reflect >> > > > > > the >> > > > > > > > > > > semantics) >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > support downgrade.": I am a bit confused >> on >> > > that. >> > > > > Are >> > > > > > > we >> > > > > > > > > > > writing >> > > > > > > > > > > > > > > > different >> > > > > > > > > > > > > > > > > values to the existing fields? Then, we >> can't >> > > > > > > downgrade, >> > > > > > > > > > right? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 15. active-transaction-total-time-max : >> Would >> > > > > > > > > > > > > > > > > active-transaction-open-time-max be more >> > > > intuitive? >> > > > > > > Also, >> > > > > > > > > > could >> > > > > > > > > > > > we >> > > > > > > > > > > > > > > > include >> > > > > > > > > > > > > > > > > the full name (group, tags, etc)? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable >> The >> > > > > default >> > > > > > > > would >> > > > > > > > > be >> > > > > > > > > > > > > > ‘false’. >> > > > > > > > > > > > > > > > If >> > > > > > > > > > > > > > > > > it’s ‘false’, 2PC functionality is >> disabled >> > > even >> > > > if >> > > > > > the >> > > > > > > > ACL >> > > > > > > > > > is >> > > > > > > > > > > > set, >> > > > > > > > > > > > > > > > clients >> > > > > > > > > > > > > > > > > that attempt to use this functionality >> would >> > > > > receive >> > > > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED >> error." >> > > > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED >> seems >> > > > > > unintuitive >> > > > > > > > for >> > > > > > > > > > the >> > > > > > > > > > > > > > client >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > understand what the actual cause is. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this >> to >> > be >> > > > > only >> > > > > > > used >> > > > > > > > > > > during >> > > > > > > > > > > > > > > > recovery. >> > > > > > > > > > > > > > > > > Could we document this clearly? Could we >> > > prevent >> > > > it >> > > > > > > from >> > > > > > > > > > being >> > > > > > > > > > > > used >> > > > > > > > > > > > > > > > > incorrectly (e.g. throw an exception if >> the >> > > > > producer >> > > > > > > has >> > > > > > > > > > called >> > > > > > > > > > > > > other >> > > > > > > > > > > > > > > > > methods like send())? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 18. "either prepareTransaction was called >> or >> > > > > > > > > > > > initTransaction(true) >> > > > > > > > > > > > > > was >> > > > > > > > > > > > > > > > > called": "either" should be "neither"? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up >> the >> > > > epoch, >> > > > > > it >> > > > > > > > > > creates >> > > > > > > > > > > a >> > > > > > > > > > > > > > > > situation >> > > > > > > > > > > > > > > > > where there could be multiple outstanding >> > txns. >> > > > The >> > > > > > > > > following >> > > > > > > > > > > is >> > > > > > > > > > > > an >> > > > > > > > > > > > > > > > example >> > > > > > > > > > > > > > > > > of a potential problem during recovery. >> > > > > > > > > > > > > > > > > The last txn epoch in the external >> store >> > is >> > > 41 >> > > > > > when >> > > > > > > > the >> > > > > > > > > > app >> > > > > > > > > > > > > dies. >> > > > > > > > > > > > > > > > > Instance1 is created for recovery. >> > > > > > > > > > > > > > > > > 1. (instance1) >> > > > > > > InitProducerId(keepPreparedTxn=true), >> > > > > > > > > > > > epoch=42, >> > > > > > > > > > > > > > > > > ongoingEpoch=41 >> > > > > > > > > > > > > > > > > 2. (instance1) dies before >> > completeTxn(41) >> > > > can >> > > > > > be >> > > > > > > > > > called. >> > > > > > > > > > > > > > > > > Instance2 is created for recovery. >> > > > > > > > > > > > > > > > > 3. (instance2) >> > > > > > > InitProducerId(keepPreparedTxn=true), >> > > > > > > > > > > > epoch=43, >> > > > > > > > > > > > > > > > > ongoingEpoch=42 >> > > > > > > > > > > > > > > > > 4. (instance2) completeTxn(41) => >> abort >> > > > > > > > > > > > > > > > > The first problem is that 41 now is >> > aborted >> > > > when >> > > > > > it >> > > > > > > > > should >> > > > > > > > > > > be >> > > > > > > > > > > > > > > > committed. >> > > > > > > > > > > > > > > > > The second one is that it's not clear who >> > could >> > > > > abort >> > > > > > > > epoch >> > > > > > > > > > 42, >> > > > > > > > > > > > > which >> > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > still open. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Jun >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine >> Olshan >> > > > > > > > > > > > > > > > <jols...@confluent.io.invalid >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Hey Artem, >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Thanks for the updates. I think what you >> > say >> > > > > makes >> > > > > > > > > sense. I >> > > > > > > > > > > > just >> > > > > > > > > > > > > > > > updated >> > > > > > > > > > > > > > > > > my >> > > > > > > > > > > > > > > > > > KIP so I want to reconcile some of the >> > > changes >> > > > we >> > > > > > > made >> > > > > > > > > > > > especially >> > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > respect to the TransactionLogValue. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Firstly, I believe tagged fields >> require a >> > > > > default >> > > > > > > > value >> > > > > > > > > so >> > > > > > > > > > > > that >> > > > > > > > > > > > > if >> > > > > > > > > > > > > > > > they >> > > > > > > > > > > > > > > > > > are not filled, we return the default >> (and >> > > know >> > > > > > that >> > > > > > > > they >> > > > > > > > > > > were >> > > > > > > > > > > > > > > empty). >> > > > > > > > > > > > > > > > > For >> > > > > > > > > > > > > > > > > > my KIP, I proposed the default for >> producer >> > > ID >> > > > > > tagged >> > > > > > > > > > fields >> > > > > > > > > > > > > should >> > > > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > -1. >> > > > > > > > > > > > > > > > > > I was wondering if we could update the >> KIP >> > to >> > > > > > include >> > > > > > > > the >> > > > > > > > > > > > default >> > > > > > > > > > > > > > > > values >> > > > > > > > > > > > > > > > > > for producer ID and epoch. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Next, I noticed we decided to rename the >> > > > fields. >> > > > > I >> > > > > > > > guess >> > > > > > > > > > that >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > field >> > > > > > > > > > > > > > > > > > "NextProducerId" in my KIP correlates to >> > > > > > "ProducerId" >> > > > > > > > in >> > > > > > > > > > this >> > > > > > > > > > > > > KIP. >> > > > > > > > > > > > > > Is >> > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > correct? So we would have >> > > > "TransactionProducerId" >> > > > > > for >> > > > > > > > the >> > > > > > > > > > > > > > non-tagged >> > > > > > > > > > > > > > > > > field >> > > > > > > > > > > > > > > > > > and have "ProducerId" (NextProducerId) >> and >> > > > > > > > > "PrevProducerId" >> > > > > > > > > > > as >> > > > > > > > > > > > > > tagged >> > > > > > > > > > > > > > > > > > fields the final version after KIP-890 >> and >> > > > > KIP-936 >> > > > > > > are >> > > > > > > > > > > > > implemented. >> > > > > > > > > > > > > > > Is >> > > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > correct? I think the tags will need >> > updating, >> > > > but >> > > > > > > that >> > > > > > > > is >> > > > > > > > > > > > > trivial. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > The final question I had was with >> respect >> > to >> > > > > > storing >> > > > > > > > the >> > > > > > > > > > new >> > > > > > > > > > > > > epoch. >> > > > > > > > > > > > > > > In >> > > > > > > > > > > > > > > > > > KIP-890 part 2 (epoch bumps) I think we >> > > > concluded >> > > > > > > that >> > > > > > > > we >> > > > > > > > > > > don't >> > > > > > > > > > > > > > need >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > store the epoch since we can interpret >> the >> > > > > previous >> > > > > > > > epoch >> > > > > > > > > > > based >> > > > > > > > > > > > > on >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > producer ID. But here we could call the >> > > > > > > InitProducerId >> > > > > > > > > > > multiple >> > > > > > > > > > > > > > times >> > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > we only want the producer with the >> correct >> > > > epoch >> > > > > to >> > > > > > > be >> > > > > > > > > able >> > > > > > > > > > > to >> > > > > > > > > > > > > > commit >> > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > transaction. Is that the correct >> reasoning >> > > for >> > > > > why >> > > > > > we >> > > > > > > > > need >> > > > > > > > > > > > epoch >> > > > > > > > > > > > > > here >> > > > > > > > > > > > > > > > but >> > > > > > > > > > > > > > > > > > not the Prepare/Commit state. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > > Justine >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem >> > > Livshits >> > > > > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Hi Justine, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > After thinking a bit about supporting >> > > atomic >> > > > > dual >> > > > > > > > > writes >> > > > > > > > > > > for >> > > > > > > > > > > > > > Kafka >> > > > > > > > > > > > > > > + >> > > > > > > > > > > > > > > > > > NoSQL >> > > > > > > > > > > > > > > > > > > database, I came to a conclusion that >> we >> > do >> > > > > need >> > > > > > to >> > > > > > > > > bump >> > > > > > > > > > > the >> > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > even >> > > > > > > > > > > > > > > > > > > with >> > InitProducerId(keepPreparedTxn=true). >> > > > As >> > > > > I >> > > > > > > > > > described >> > > > > > > > > > > in >> > > > > > > > > > > > > my >> > > > > > > > > > > > > > > > > previous >> > > > > > > > > > > > > > > > > > > email, we wouldn't need to bump the >> epoch >> > > to >> > > > > > > protect >> > > > > > > > > from >> > > > > > > > > > > > > zombies >> > > > > > > > > > > > > > > so >> > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > reasoning is still true. But we >> cannot >> > > > protect >> > > > > > > from >> > > > > > > > > > > > > split-brain >> > > > > > > > > > > > > > > > > > scenarios >> > > > > > > > > > > > > > > > > > > when two or more instances of a >> producer >> > > with >> > > > > the >> > > > > > > > same >> > > > > > > > > > > > > > > transactional >> > > > > > > > > > > > > > > > id >> > > > > > > > > > > > > > > > > > try >> > > > > > > > > > > > > > > > > > > to produce at the same time. The >> > > dual-write >> > > > > > > example >> > > > > > > > > for >> > > > > > > > > > > SQL >> > > > > > > > > > > > > > > > databases >> > > > > > > > > > > > > > > > > ( >> > > > > > > > > > > > > > > > > > > >> > > > > https://github.com/apache/kafka/pull/14231/files >> > > > > > ) >> > > > > > > > > > doesn't >> > > > > > > > > > > > > have a >> > > > > > > > > > > > > > > > > > > split-brain problem because execution >> is >> > > > > > protected >> > > > > > > by >> > > > > > > > > the >> > > > > > > > > > > > > update >> > > > > > > > > > > > > > > lock >> > > > > > > > > > > > > > > > > on >> > > > > > > > > > > > > > > > > > > the transaction state record; however >> > NoSQL >> > > > > > > databases >> > > > > > > > > may >> > > > > > > > > > > not >> > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > protection (I'll write an example for >> > NoSQL >> > > > > > > database >> > > > > > > > > > > > dual-write >> > > > > > > > > > > > > > > > soon). >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > In a nutshell, here is an example of a >> > > > > > split-brain >> > > > > > > > > > > scenario: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > 1. (instance1) >> > > > > > > > InitProducerId(keepPreparedTxn=true), >> > > > > > > > > > got >> > > > > > > > > > > > > > > epoch=42 >> > > > > > > > > > > > > > > > > > > 2. (instance2) >> > > > > > > > InitProducerId(keepPreparedTxn=true), >> > > > > > > > > > got >> > > > > > > > > > > > > > > epoch=42 >> > > > > > > > > > > > > > > > > > > 3. (instance1) CommitTxn, epoch >> bumped >> > > to >> > > > 43 >> > > > > > > > > > > > > > > > > > > 4. (instance2) CommitTxn, this is >> > > > > considered a >> > > > > > > > > retry, >> > > > > > > > > > so >> > > > > > > > > > > > it >> > > > > > > > > > > > > > got >> > > > > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > > > 43 >> > > > > > > > > > > > > > > > > > > as well >> > > > > > > > > > > > > > > > > > > 5. (instance1) Produce messageA >> > > > w/sequence 1 >> > > > > > > > > > > > > > > > > > > 6. (instance2) Produce messageB >> > > w/sequence >> > > > > 1, >> > > > > > > this >> > > > > > > > > is >> > > > > > > > > > > > > > > considered a >> > > > > > > > > > > > > > > > > > > duplicate >> > > > > > > > > > > > > > > > > > > 7. (instance2) Produce messageC >> > > > w/sequence 2 >> > > > > > > > > > > > > > > > > > > 8. (instance1) Produce messageD >> > > w/sequence >> > > > > 2, >> > > > > > > this >> > > > > > > > > is >> > > > > > > > > > > > > > > considered a >> > > > > > > > > > > > > > > > > > > duplicate >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Now if either of those commit the >> > > > transaction, >> > > > > it >> > > > > > > > would >> > > > > > > > > > > have >> > > > > > > > > > > > a >> > > > > > > > > > > > > > mix >> > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > > messages from the two instances >> (messageA >> > > and >> > > > > > > > > messageC). >> > > > > > > > > > > > With >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > proper >> > > > > > > > > > > > > > > > > > > epoch bump, instance1 would get >> fenced at >> > > > step >> > > > > 3. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > In order to update epoch in >> > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true) >> > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > need >> > > > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > preserve the ongoing transaction's >> epoch >> > > (and >> > > > > > > > > producerId, >> > > > > > > > > > > if >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > > > > overflows), because we'd need to make >> a >> > > > correct >> > > > > > > > > decision >> > > > > > > > > > > when >> > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > compare >> > > > > > > > > > > > > > > > > > > the PreparedTxnState that we read from >> > the >> > > > > > database >> > > > > > > > > with >> > > > > > > > > > > the >> > > > > > > > > > > > > > > > > (producerId, >> > > > > > > > > > > > > > > > > > > epoch) of the ongoing transaction. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > I've updated the KIP with the >> following: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > - Ongoing transaction now has 2 >> > > > (producerId, >> > > > > > > > epoch) >> > > > > > > > > > > pairs >> > > > > > > > > > > > -- >> > > > > > > > > > > > > > one >> > > > > > > > > > > > > > > > > pair >> > > > > > > > > > > > > > > > > > > describes the ongoing transaction, >> the >> > > > other >> > > > > > > pair >> > > > > > > > > > > > describes >> > > > > > > > > > > > > > > > expected >> > > > > > > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > > > > for operations on this >> transactional >> > id >> > > > > > > > > > > > > > > > > > > - InitProducerIdResponse now >> returns 2 >> > > > > > > > (producerId, >> > > > > > > > > > > epoch) >> > > > > > > > > > > > > > pairs >> > > > > > > > > > > > > > > > > > > - TransactionalLogValue now has 2 >> > > > > (producerId, >> > > > > > > > > epoch) >> > > > > > > > > > > > pairs, >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > new >> > > > > > > > > > > > > > > > > > > values added as tagged fields, so >> it's >> > > > easy >> > > > > to >> > > > > > > > > > downgrade >> > > > > > > > > > > > > > > > > > > - Added a note about downgrade in >> the >> > > > > > > > Compatibility >> > > > > > > > > > > > section >> > > > > > > > > > > > > > > > > > > - Added a rejected alternative >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > -Artem >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem >> > > > Livshits < >> > > > > > > > > > > > > > > > alivsh...@confluent.io> >> > > > > > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Hi Justine, >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Thank you for the questions. >> Currently >> > > > > > > > (pre-KIP-939) >> > > > > > > > > > we >> > > > > > > > > > > > > always >> > > > > > > > > > > > > > > > bump >> > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > epoch on InitProducerId and abort an >> > > > ongoing >> > > > > > > > > > transaction >> > > > > > > > > > > > (if >> > > > > > > > > > > > > > > > any). I >> > > > > > > > > > > > > > > > > > > > expect this behavior will continue >> with >> > > > > KIP-890 >> > > > > > > as >> > > > > > > > > > well. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > With KIP-939 we need to support the >> > case >> > > > when >> > > > > > the >> > > > > > > > > > ongoing >> > > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > > > > > needs to be preserved when >> > > > > > keepPreparedTxn=true. >> > > > > > > > > > Bumping >> > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > > without >> > > > > > > > > > > > > > > > > > > > aborting or committing a >> transaction is >> > > > > tricky >> > > > > > > > > because >> > > > > > > > > > > > epoch >> > > > > > > > > > > > > > is a >> > > > > > > > > > > > > > > > > short >> > > > > > > > > > > > > > > > > > > > value and it's easy to overflow. >> > > > Currently, >> > > > > > the >> > > > > > > > > > overflow >> > > > > > > > > > > > > case >> > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > handled >> > > > > > > > > > > > > > > > > > > > by aborting the ongoing transaction, >> > > which >> > > > > > would >> > > > > > > > send >> > > > > > > > > > out >> > > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > > > > > markers with epoch=Short.MAX_VALUE >> to >> > the >> > > > > > > partition >> > > > > > > > > > > > leaders, >> > > > > > > > > > > > > > > which >> > > > > > > > > > > > > > > > > > would >> > > > > > > > > > > > > > > > > > > > fence off any messages with the >> > producer >> > > id >> > > > > > that >> > > > > > > > > > started >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > > > > > (they would have epoch that is less >> > than >> > > > > > > > > > > Short.MAX_VALUE). >> > > > > > > > > > > > > > Then >> > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > > safe >> > > > > > > > > > > > > > > > > > > > to allocate a new producer id and >> use >> > it >> > > in >> > > > > new >> > > > > > > > > > > > transactions. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > We could say that maybe when >> > > > > > keepPreparedTxn=true >> > > > > > > > we >> > > > > > > > > > bump >> > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > > unless >> > > > > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > > > > leads to overflow, and don't bump >> epoch >> > > in >> > > > > the >> > > > > > > > > overflow >> > > > > > > > > > > > case. >> > > > > > > > > > > > > > I >> > > > > > > > > > > > > > > > > don't >> > > > > > > > > > > > > > > > > > > > think it's a good solution because >> if >> > > it's >> > > > > not >> > > > > > > safe >> > > > > > > > > to >> > > > > > > > > > > keep >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > same >> > > > > > > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > > > > > when keepPreparedTxn=true, then we >> must >> > > > > handle >> > > > > > > the >> > > > > > > > > > epoch >> > > > > > > > > > > > > > overflow >> > > > > > > > > > > > > > > > > case >> > > > > > > > > > > > > > > > > > as >> > > > > > > > > > > > > > > > > > > > well. So either we should convince >> > > > ourselves >> > > > > > > that >> > > > > > > > > it's >> > > > > > > > > > > > safe >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > keep >> > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > epoch and do it in the general >> case, or >> > > we >> > > > > > always >> > > > > > > > > bump >> > > > > > > > > > > the >> > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > handle >> > > > > > > > > > > > > > > > > > > > the overflow. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > With KIP-890, we bump the epoch on >> > every >> > > > > > > > transaction >> > > > > > > > > > > > commit / >> > > > > > > > > > > > > > > > abort. >> > > > > > > > > > > > > > > > > > > This >> > > > > > > > > > > > > > > > > > > > guarantees that even if >> > > > > > > > > > > > InitProducerId(keepPreparedTxn=true) >> > > > > > > > > > > > > > > > doesn't >> > > > > > > > > > > > > > > > > > > > increment epoch on the ongoing >> > > transaction, >> > > > > the >> > > > > > > > > client >> > > > > > > > > > > will >> > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > call >> > > > > > > > > > > > > > > > > > > > commit or abort to finish the >> > transaction >> > > > and >> > > > > > > will >> > > > > > > > > > > > increment >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > > > > > > (and >> > > > > > > > > > > > > > > > > > > > handle epoch overflow, if needed). >> If >> > > the >> > > > > > > ongoing >> > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > was >> > > > > > > > > > > > > > > > > in a >> > > > > > > > > > > > > > > > > > > bad >> > > > > > > > > > > > > > > > > > > > state and had some zombies waiting >> to >> > > > arrive, >> > > > > > the >> > > > > > > > > abort >> > > > > > > > > > > > > > operation >> > > > > > > > > > > > > > > > > would >> > > > > > > > > > > > > > > > > > > > fence them because with KIP-890 >> every >> > > abort >> > > > > > would >> > > > > > > > > bump >> > > > > > > > > > > the >> > > > > > > > > > > > > > epoch. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > We could also look at this from the >> > > > following >> > > > > > > > > > > perspective. >> > > > > > > > > > > > > > With >> > > > > > > > > > > > > > > > > > KIP-890, >> > > > > > > > > > > > > > > > > > > > zombies won't be able to cross >> > > transaction >> > > > > > > > > boundaries; >> > > > > > > > > > > each >> > > > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > > > > > completion creates a boundary and >> any >> > > > > activity >> > > > > > in >> > > > > > > > the >> > > > > > > > > > > past >> > > > > > > > > > > > > gets >> > > > > > > > > > > > > > > > > > confined >> > > > > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > > the boundary. Then data in any >> > partition >> > > > > would >> > > > > > > > look >> > > > > > > > > > like >> > > > > > > > > > > > > this: >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > 1. message1, epoch=42 >> > > > > > > > > > > > > > > > > > > > 2. message2, epoch=42 >> > > > > > > > > > > > > > > > > > > > 3. message3, epoch=42 >> > > > > > > > > > > > > > > > > > > > 4. marker (commit or abort), >> epoch=43 >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Now if we inject steps 3a and 3b >> like >> > > this: >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > 1. message1, epoch=42 >> > > > > > > > > > > > > > > > > > > > 2. message2, epoch=42 >> > > > > > > > > > > > > > > > > > > > 3. message3, epoch=42 >> > > > > > > > > > > > > > > > > > > > 3a. crash >> > > > > > > > > > > > > > > > > > > > 3b. >> > InitProducerId(keepPreparedTxn=true) >> > > > > > > > > > > > > > > > > > > > 4. marker (commit or abort), >> epoch=43 >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > The invariant still holds even with >> > steps >> > > > 3a >> > > > > > and >> > > > > > > 3b >> > > > > > > > > -- >> > > > > > > > > > > > > whatever >> > > > > > > > > > > > > > > > > > activity >> > > > > > > > > > > > > > > > > > > > was in the past will get confined in >> > the >> > > > past >> > > > > > > with >> > > > > > > > > > > > mandatory >> > > > > > > > > > > > > > > abort >> > > > > > > > > > > > > > > > / >> > > > > > > > > > > > > > > > > > > commit >> > > > > > > > > > > > > > > > > > > > that must follow >> > > > > > > > > InitProducerId(keepPreparedTxn=true). >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > So KIP-890 provides the proper >> > isolation >> > > > > > between >> > > > > > > > > > > > > transactions, >> > > > > > > > > > > > > > so >> > > > > > > > > > > > > > > > > > > > injecting crash + >> > > > > > > > > InitProducerId(keepPreparedTxn=true) >> > > > > > > > > > > into >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > transaction sequence is safe from >> the >> > > > zombie >> > > > > > > > > protection >> > > > > > > > > > > > > > > > perspective. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > That said, I'm still thinking about >> it >> > > and >> > > > > > > looking >> > > > > > > > > for >> > > > > > > > > > > > cases >> > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > might >> > > > > > > > > > > > > > > > > > > > break because we don't bump epoch >> when >> > > > > > > > > > > > > > > > > > > > >> InitProducerId(keepPreparedTxn=true), >> > if >> > > > such >> > > > > > > cases >> > > > > > > > > > > exist, >> > > > > > > > > > > > > > we'll >> > > > > > > > > > > > > > > > need >> > > > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > > develop the logic to handle epoch >> > > overflow >> > > > > for >> > > > > > > > > ongoing >> > > > > > > > > > > > > > > > transactions. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > -Artem >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM >> Justine >> > > > > Olshan >> > > > > > > > > > > > > > > > > > > > <jols...@confluent.io.invalid> >> wrote: >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> Hey Artem, >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> Thanks for the KIP. I had a >> question >> > > about >> > > > > > epoch >> > > > > > > > > > > bumping. >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> Previously when we send an >> > > InitProducerId >> > > > > > > request >> > > > > > > > on >> > > > > > > > > > > > > Producer >> > > > > > > > > > > > > > > > > startup, >> > > > > > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > >> bump the epoch and abort the >> > > transaction. >> > > > Is >> > > > > > it >> > > > > > > > > > correct >> > > > > > > > > > > to >> > > > > > > > > > > > > > > assume >> > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > >> will still bump the epoch, but just >> > not >> > > > > abort >> > > > > > > the >> > > > > > > > > > > > > transaction? >> > > > > > > > > > > > > > > > > > > >> If we still bump the epoch in this >> > case, >> > > > how >> > > > > > > does >> > > > > > > > > this >> > > > > > > > > > > > > > interact >> > > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > > >> KIP-890 where we also bump the >> epoch >> > on >> > > > > every >> > > > > > > > > > > transaction. >> > > > > > > > > > > > > (I >> > > > > > > > > > > > > > > > think >> > > > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > >> means that we may skip epochs and >> the >> > > data >> > > > > > > itself >> > > > > > > > > will >> > > > > > > > > > > all >> > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > same >> > > > > > > > > > > > > > > > > > > >> epoch) >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> I may have follow ups depending on >> the >> > > > > answer >> > > > > > to >> > > > > > > > > this. >> > > > > > > > > > > :) >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> Thanks, >> > > > > > > > > > > > > > > > > > > >> Justine >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM >> Artem >> > > > > Livshits >> > > > > > > > > > > > > > > > > > > >> <alivsh...@confluent.io.invalid> >> > wrote: >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> > Hi Alex, >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > Thank you for your questions. >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > > the purpose of having >> broker-level >> > > > > > > > > > > > > > > > > > > transaction.two.phase.commit.enable >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > The thinking is that 2PC is a >> bit of >> > > an >> > > > > > > advanced >> > > > > > > > > > > > construct >> > > > > > > > > > > > > > so >> > > > > > > > > > > > > > > > > > enabling >> > > > > > > > > > > > > > > > > > > >> 2PC >> > > > > > > > > > > > > > > > > > > >> > in a Kafka cluster should be an >> > > explicit >> > > > > > > > decision. >> > > > > > > > > > If >> > > > > > > > > > > > it >> > > > > > > > > > > > > is >> > > > > > > > > > > > > > > set >> > > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > >> 'false' >> > > > > > > > > > > > > > > > > > > >> > InitiProducerId (and >> > initTransactions) >> > > > > would >> > > > > > > > > > > > > > > > > > > >> > return >> > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED. >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > > WDYT about adding an >> AdminClient >> > > > method >> > > > > > that >> > > > > > > > > > returns >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > state >> > > > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > > >> > >> transaction.two.phase.commit.enable >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > I wonder if the client could just >> > try >> > > to >> > > > > use >> > > > > > > 2PC >> > > > > > > > > and >> > > > > > > > > > > > then >> > > > > > > > > > > > > > > handle >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> error >> > > > > > > > > > > > > > > > > > > >> > (e.g. if it needs to fall back to >> > > > ordinary >> > > > > > > > > > > > transactions). >> > > > > > > > > > > > > > > This >> > > > > > > > > > > > > > > > > way >> > > > > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > > > >> > could uniformly handle cases when >> > > Kafka >> > > > > > > cluster >> > > > > > > > > > > doesn't >> > > > > > > > > > > > > > > support >> > > > > > > > > > > > > > > > > 2PC >> > > > > > > > > > > > > > > > > > > >> > completely and cases when 2PC is >> > > > > restricted >> > > > > > to >> > > > > > > > > > certain >> > > > > > > > > > > > > > users. >> > > > > > > > > > > > > > > > We >> > > > > > > > > > > > > > > > > > > could >> > > > > > > > > > > > > > > > > > > >> > also expose this config in >> > > > > describeConfigs, >> > > > > > if >> > > > > > > > the >> > > > > > > > > > > > > fallback >> > > > > > > > > > > > > > > > > approach >> > > > > > > > > > > > > > > > > > > >> > doesn't work for some scenarios. >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > -Artem >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM >> > > > Alexander >> > > > > > > > > Sorokoumov >> > > > > > > > > > > > > > > > > > > >> > <asorokou...@confluent.io >> .invalid> >> > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> > > Hi Artem, >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > > Thanks for publishing this KIP! >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > > Can you please clarify the >> purpose >> > > of >> > > > > > having >> > > > > > > > > > > > > broker-level >> > > > > > > > > > > > > > > > > > > >> > > >> > transaction.two.phase.commit.enable >> > > > > config >> > > > > > > in >> > > > > > > > > > > addition >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > new >> > > > > > > > > > > > > > > > > > > >> ACL? If >> > > > > > > > > > > > > > > > > > > >> > > the brokers are configured with >> > > > > > > > > > > > > > > > > > > >> > >> > > > transaction.two.phase.commit.enable=false, >> > > > > > > > > > > > > > > > > > > >> > > at what point will a client >> > > configured >> > > > > > with >> > > > > > > > > > > > > > > > > > > >> > > >> > > > transaction.two.phase.commit.enable=true >> > > > > > > fail? >> > > > > > > > > > Will >> > > > > > > > > > > it >> > > > > > > > > > > > > > > happen >> > > > > > > > > > > > > > > > at >> > > > > > > > > > > > > > > > > > > >> > > KafkaProducer#initTransactions? >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > > WDYT about adding an >> AdminClient >> > > > method >> > > > > > that >> > > > > > > > > > returns >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > state >> > > > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > t >> > > > > > > > > > > > > > > > > > > >> > > >> > ransaction.two.phase.commit.enable? >> > > > This >> > > > > > > way, >> > > > > > > > > > > clients >> > > > > > > > > > > > > > would >> > > > > > > > > > > > > > > > know >> > > > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > >> > advance >> > > > > > > > > > > > > > > > > > > >> > > if 2PC is enabled on the >> brokers. >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > > Best, >> > > > > > > > > > > > > > > > > > > >> > > Alex >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM >> > > Roger >> > > > > > > Hoover < >> > > > > > > > > > > > > > > > > > > roger.hoo...@gmail.com> >> > > > > > > > > > > > > > > > > > > >> > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > > > Other than supporting >> > multiplexing >> > > > > > > > > transactional >> > > > > > > > > > > > > streams >> > > > > > > > > > > > > > > on >> > > > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > > single >> > > > > > > > > > > > > > > > > > > >> > > > producer, I don't see how to >> > > improve >> > > > > it. >> > > > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at >> 12:12 PM >> > > > Artem >> > > > > > > > > Livshits >> > > > > > > > > > > > > > > > > > > >> > > > <alivsh...@confluent.io >> > .invalid> >> > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > > > >> > > > > Hi Roger, >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > Thank you for summarizing >> the >> > > > > cons. I >> > > > > > > > agree >> > > > > > > > > > and >> > > > > > > > > > > > I'm >> > > > > > > > > > > > > > > > curious >> > > > > > > > > > > > > > > > > > > what >> > > > > > > > > > > > > > > > > > > >> > would >> > > > > > > > > > > > > > > > > > > >> > > > be >> > > > > > > > > > > > > > > > > > > >> > > > > the alternatives to solve >> > these >> > > > > > problems >> > > > > > > > > > better >> > > > > > > > > > > > and >> > > > > > > > > > > > > if >> > > > > > > > > > > > > > > > they >> > > > > > > > > > > > > > > > > > can >> > > > > > > > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > > > >> > > > > incorporated into this >> > proposal >> > > > (or >> > > > > > > built >> > > > > > > > > > > > > > independently >> > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > >> addition >> > > > > > > > > > > > > > > > > > > >> > to >> > > > > > > > > > > > > > > > > > > >> > > or >> > > > > > > > > > > > > > > > > > > >> > > > > on top of this proposal). >> > E.g. >> > > > one >> > > > > > > > > potential >> > > > > > > > > > > > > > extension >> > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > >> discussed >> > > > > > > > > > > > > > > > > > > >> > > > > earlier in the thread >> could be >> > > > > > > > multiplexing >> > > > > > > > > > > > logical >> > > > > > > > > > > > > > > > > > > transactional >> > > > > > > > > > > > > > > > > > > >> > > > "streams" >> > > > > > > > > > > > > > > > > > > >> > > > > with a single producer. >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > -Artem >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at >> > 4:50 PM >> > > > > Roger >> > > > > > > > > Hoover < >> > > > > > > > > > > > > > > > > > > >> roger.hoo...@gmail.com >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Thanks. I like that >> you're >> > > > moving >> > > > > > > Kafka >> > > > > > > > > > > toward >> > > > > > > > > > > > > > > > supporting >> > > > > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > >> > > > > dual-write >> > > > > > > > > > > > > > > > > > > >> > > > > > pattern. Each use case >> > needs >> > > to >> > > > > > > > consider >> > > > > > > > > > the >> > > > > > > > > > > > > > > tradeoffs. >> > > > > > > > > > > > > > > > > > You >> > > > > > > > > > > > > > > > > > > >> > already >> > > > > > > > > > > > > > > > > > > >> > > > > > summarized the pros very >> > well >> > > in >> > > > > the >> > > > > > > > > KIP. I >> > > > > > > > > > > > would >> > > > > > > > > > > > > > > > > summarize >> > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > cons >> > > > > > > > > > > > > > > > > > > >> > > > > > as follows: >> > > > > > > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > - you sacrifice >> > availability - >> > > > > each >> > > > > > > > write >> > > > > > > > > > > > requires >> > > > > > > > > > > > > > > both >> > > > > > > > > > > > > > > > DB >> > > > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > >> > Kafka >> > > > > > > > > > > > > > > > > > > >> > > to >> > > > > > > > > > > > > > > > > > > >> > > > > be >> > > > > > > > > > > > > > > > > > > >> > > > > > available so I think your >> > > > overall >> > > > > > > > > > application >> > > > > > > > > > > > > > > > availability >> > > > > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > > 1 >> > > > > > > > > > > > > > > > > > > >> - >> > > > > > > > > > > > > > > > > > > >> > > p(DB >> > > > > > > > > > > > > > > > > > > >> > > > is >> > > > > > > > > > > > > > > > > > > >> > > > > > unavailable)*p(Kafka is >> > > > > > unavailable). >> > > > > > > > > > > > > > > > > > > >> > > > > > - latency will be higher >> and >> > > > > > > throughput >> > > > > > > > > > lower >> > > > > > > > > > > - >> > > > > > > > > > > > > each >> > > > > > > > > > > > > > > > write >> > > > > > > > > > > > > > > > > > > >> requires >> > > > > > > > > > > > > > > > > > > >> > > > both >> > > > > > > > > > > > > > > > > > > >> > > > > > writes to DB and Kafka >> while >> > > > > holding >> > > > > > > an >> > > > > > > > > > > > exclusive >> > > > > > > > > > > > > > lock >> > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > DB. >> > > > > > > > > > > > > > > > > > > >> > > > > > - you need to create a >> > > producer >> > > > > per >> > > > > > > unit >> > > > > > > > > of >> > > > > > > > > > > > > > > concurrency >> > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > your >> > > > > > > > > > > > > > > > > > > >> app >> > > > > > > > > > > > > > > > > > > >> > > > which >> > > > > > > > > > > > > > > > > > > >> > > > > > has some overhead in the >> app >> > > and >> > > > > > Kafka >> > > > > > > > > side >> > > > > > > > > > > > > (number >> > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > > >> connections, >> > > > > > > > > > > > > > > > > > > >> > > > poor >> > > > > > > > > > > > > > > > > > > >> > > > > > batching). I assume the >> > > > producers >> > > > > > > would >> > > > > > > > > > need >> > > > > > > > > > > to >> > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > > configured >> > > > > > > > > > > > > > > > > > > >> for >> > > > > > > > > > > > > > > > > > > >> > > low >> > > > > > > > > > > > > > > > > > > >> > > > > > latency (linger.ms=0) >> > > > > > > > > > > > > > > > > > > >> > > > > > - there's some >> complexity in >> > > > > > managing >> > > > > > > > > stable >> > > > > > > > > > > > > > > > transactional >> > > > > > > > > > > > > > > > > > ids >> > > > > > > > > > > > > > > > > > > >> for >> > > > > > > > > > > > > > > > > > > >> > > each >> > > > > > > > > > > > > > > > > > > >> > > > > > producer/concurrency >> unit in >> > > > your >> > > > > > > > > > application. >> > > > > > > > > > > > > With >> > > > > > > > > > > > > > > k8s >> > > > > > > > > > > > > > > > > > > >> > deployment, >> > > > > > > > > > > > > > > > > > > >> > > > you >> > > > > > > > > > > > > > > > > > > >> > > > > > may need to switch to >> > > something >> > > > > > like a >> > > > > > > > > > > > StatefulSet >> > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > gives >> > > > > > > > > > > > > > > > > > > >> each >> > > > > > > > > > > > > > > > > > > >> > > pod >> > > > > > > > > > > > > > > > > > > >> > > > a >> > > > > > > > > > > > > > > > > > > >> > > > > > stable identity across >> > > restarts. >> > > > > On >> > > > > > > top >> > > > > > > > > of >> > > > > > > > > > > that >> > > > > > > > > > > > > pod >> > > > > > > > > > > > > > > > > > identity >> > > > > > > > > > > > > > > > > > > >> which >> > > > > > > > > > > > > > > > > > > >> > > you >> > > > > > > > > > > > > > > > > > > >> > > > > can >> > > > > > > > > > > > > > > > > > > >> > > > > > use as a prefix, you then >> > > assign >> > > > > > > unique >> > > > > > > > > > > > > > transactional >> > > > > > > > > > > > > > > > ids >> > > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > >> each >> > > > > > > > > > > > > > > > > > > >> > > > > > concurrency unit >> > > > > (thread/goroutine). >> > > > > > > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at >> > > 12:53 PM >> > > > > > Artem >> > > > > > > > > > > Livshits >> > > > > > > > > > > > > > > > > > > >> > > > > > <alivsh...@confluent.io >> > > > .invalid> >> > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > Hi Roger, >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > Thank you for the >> > feedback. >> > > > You >> > > > > > > make >> > > > > > > > a >> > > > > > > > > > very >> > > > > > > > > > > > > good >> > > > > > > > > > > > > > > > point >> > > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > >> we >> > > > > > > > > > > > > > > > > > > >> > > also >> > > > > > > > > > > > > > > > > > > >> > > > > > > discussed internally. >> > > Adding >> > > > > > > support >> > > > > > > > > for >> > > > > > > > > > > > > multiple >> > > > > > > > > > > > > > > > > > > concurrent >> > > > > > > > > > > > > > > > > > > >> > > > > > > transactions in one >> > producer >> > > > > could >> > > > > > > be >> > > > > > > > > > > valuable >> > > > > > > > > > > > > but >> > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > > seems >> > > > > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > >> > be a >> > > > > > > > > > > > > > > > > > > >> > > > > > fairly >> > > > > > > > > > > > > > > > > > > >> > > > > > > large and independent >> > change >> > > > > that >> > > > > > > > would >> > > > > > > > > > > > deserve >> > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > separate >> > > > > > > > > > > > > > > > > > > >> KIP. >> > > > > > > > > > > > > > > > > > > >> > If >> > > > > > > > > > > > > > > > > > > >> > > > > such >> > > > > > > > > > > > > > > > > > > >> > > > > > > support is added we >> could >> > > > modify >> > > > > > 2PC >> > > > > > > > > > > > > functionality >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > >> incorporate >> > > > > > > > > > > > > > > > > > > >> > > > that. >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Maybe not too bad >> but a >> > > bit >> > > > of >> > > > > > > pain >> > > > > > > > to >> > > > > > > > > > > > manage >> > > > > > > > > > > > > > > these >> > > > > > > > > > > > > > > > > ids >> > > > > > > > > > > > > > > > > > > >> inside >> > > > > > > > > > > > > > > > > > > >> > > each >> > > > > > > > > > > > > > > > > > > >> > > > > > > process and across all >> > > > > application >> > > > > > > > > > > processes. >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > I'm not sure if >> supporting >> > > > > > multiple >> > > > > > > > > > > > transactions >> > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > one >> > > > > > > > > > > > > > > > > > > >> producer >> > > > > > > > > > > > > > > > > > > >> > > > would >> > > > > > > > > > > > > > > > > > > >> > > > > > make >> > > > > > > > > > > > > > > > > > > >> > > > > > > id management simpler: >> > we'd >> > > > need >> > > > > > to >> > > > > > > > > store >> > > > > > > > > > a >> > > > > > > > > > > > > piece >> > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > data >> > > > > > > > > > > > > > > > > > > per >> > > > > > > > > > > > > > > > > > > >> > > > > > transaction, >> > > > > > > > > > > > > > > > > > > >> > > > > > > so whether it's N >> > producers >> > > > > with a >> > > > > > > > > single >> > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > > or N >> > > > > > > > > > > > > > > > > > > >> > > > transactions >> > > > > > > > > > > > > > > > > > > >> > > > > > > with a single producer, >> > it's >> > > > > still >> > > > > > > > > roughly >> > > > > > > > > > > the >> > > > > > > > > > > > > > same >> > > > > > > > > > > > > > > > > amount >> > > > > > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > > >> > data >> > > > > > > > > > > > > > > > > > > >> > > to >> > > > > > > > > > > > > > > > > > > >> > > > > > > manage. In fact, >> managing >> > > > > > > > transactional >> > > > > > > > > > ids >> > > > > > > > > > > > > > > (current >> > > > > > > > > > > > > > > > > > > >> proposal) >> > > > > > > > > > > > > > > > > > > >> > > might >> > > > > > > > > > > > > > > > > > > >> > > > > be >> > > > > > > > > > > > > > > > > > > >> > > > > > > easier, because the id >> is >> > > > > > controlled >> > > > > > > > by >> > > > > > > > > > the >> > > > > > > > > > > > > > > > application >> > > > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > > > >> > > knows >> > > > > > > > > > > > > > > > > > > >> > > > > how >> > > > > > > > > > > > > > > > > > > >> > > > > > to >> > > > > > > > > > > > > > > > > > > >> > > > > > > complete the >> transaction >> > > after >> > > > > > > crash / >> > > > > > > > > > > > restart; >> > > > > > > > > > > > > > > while >> > > > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > TID >> > > > > > > > > > > > > > > > > > > >> would >> > > > > > > > > > > > > > > > > > > >> > > be >> > > > > > > > > > > > > > > > > > > >> > > > > > > generated by Kafka and >> > that >> > > > > would >> > > > > > > > > create a >> > > > > > > > > > > > > > question >> > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > > >> starting >> > > > > > > > > > > > > > > > > > > >> > > Kafka >> > > > > > > > > > > > > > > > > > > >> > > > > > > transaction, but not >> > saving >> > > > its >> > > > > > TID >> > > > > > > > and >> > > > > > > > > > then >> > > > > > > > > > > > > > > crashing, >> > > > > > > > > > > > > > > > > > then >> > > > > > > > > > > > > > > > > > > >> > > figuring >> > > > > > > > > > > > > > > > > > > >> > > > > out >> > > > > > > > > > > > > > > > > > > >> > > > > > > which transactions to >> > abort >> > > > and >> > > > > > etc. >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a >> separate >> > > > > producer >> > > > > > > for >> > > > > > > > > each >> > > > > > > > > > > > > > > concurrency >> > > > > > > > > > > > > > > > > > slot >> > > > > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > >> > the >> > > > > > > > > > > > > > > > > > > >> > > > > > > application >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > This is a very valid >> > > concern. >> > > > > > Maybe >> > > > > > > > > we'd >> > > > > > > > > > > need >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > > > > >> > > > > multiplexing >> > > > > > > > > > > > > > > > > > > >> > > > > > of >> > > > > > > > > > > > > > > > > > > >> > > > > > > transactional logical >> > > > "streams" >> > > > > > over >> > > > > > > > the >> > > > > > > > > > > same >> > > > > > > > > > > > > > > > > connection. >> > > > > > > > > > > > > > > > > > > >> Seems >> > > > > > > > > > > > > > > > > > > >> > > > like a >> > > > > > > > > > > > > > > > > > > >> > > > > > > separate KIP, though. >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems >> > you're >> > > > > left >> > > > > > > with >> > > > > > > > > > > > > > > single-threaded >> > > > > > > > > > > > > > > > > > model >> > > > > > > > > > > > > > > > > > > >> per >> > > > > > > > > > > > > > > > > > > >> > > > > > > application process? >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > That's a fair >> assessment. >> > > Not >> > > > > > > > > necessarily >> > > > > > > > > > > > > exactly >> > > > > > > > > > > > > > > > > > > >> > single-threaded >> > > > > > > > > > > > > > > > > > > >> > > > per >> > > > > > > > > > > > > > > > > > > >> > > > > > > application, but a >> single >> > > > > producer >> > > > > > > per >> > > > > > > > > > > thread >> > > > > > > > > > > > > > model >> > > > > > > > > > > > > > > > > (i.e. >> > > > > > > > > > > > > > > > > > an >> > > > > > > > > > > > > > > > > > > >> > > > > application >> > > > > > > > > > > > > > > > > > > >> > > > > > > could have a pool of >> > > threads + >> > > > > > > > producers >> > > > > > > > > > to >> > > > > > > > > > > > > > increase >> > > > > > > > > > > > > > > > > > > >> > concurrency). >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > -Artem >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at >> > > > 7:22 PM >> > > > > > > Roger >> > > > > > > > > > > Hoover < >> > > > > > > > > > > > > > > > > > > >> > > roger.hoo...@gmail.com >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Artem, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the reply. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > If I understand >> > correctly, >> > > > > Kafka >> > > > > > > > does >> > > > > > > > > > not >> > > > > > > > > > > > > > support >> > > > > > > > > > > > > > > > > > > concurrent >> > > > > > > > > > > > > > > > > > > >> > > > > > transactions >> > > > > > > > > > > > > > > > > > > >> > > > > > > > from the same >> producer >> > > > > > > > (transactional >> > > > > > > > > > id). >> > > > > > > > > > > > I >> > > > > > > > > > > > > > > think >> > > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > >> means >> > > > > > > > > > > > > > > > > > > >> > > that >> > > > > > > > > > > > > > > > > > > >> > > > > > > > applications that >> want >> > to >> > > > > > support >> > > > > > > > > > > in-process >> > > > > > > > > > > > > > > > > concurrency >> > > > > > > > > > > > > > > > > > > >> (say >> > > > > > > > > > > > > > > > > > > >> > > > > > > thread-level >> > > > > > > > > > > > > > > > > > > >> > > > > > > > concurrency with >> > row-level >> > > > DB >> > > > > > > > locking) >> > > > > > > > > > > would >> > > > > > > > > > > > > > need >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > manage >> > > > > > > > > > > > > > > > > > > >> > > > separate >> > > > > > > > > > > > > > > > > > > >> > > > > > > > transactional ids and >> > > > > producers >> > > > > > > per >> > > > > > > > > > thread >> > > > > > > > > > > > and >> > > > > > > > > > > > > > > then >> > > > > > > > > > > > > > > > > > store >> > > > > > > > > > > > > > > > > > > >> txn >> > > > > > > > > > > > > > > > > > > >> > > state >> > > > > > > > > > > > > > > > > > > >> > > > > > > > accordingly. The >> > > potential >> > > > > > > > usability >> > > > > > > > > > > > > > downsides I >> > > > > > > > > > > > > > > > see >> > > > > > > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > > > > >> > > > > > > > 1) managing a set of >> > > > > > transactional >> > > > > > > > ids >> > > > > > > > > > for >> > > > > > > > > > > > > each >> > > > > > > > > > > > > > > > > > > application >> > > > > > > > > > > > > > > > > > > >> > > process >> > > > > > > > > > > > > > > > > > > >> > > > > > that >> > > > > > > > > > > > > > > > > > > >> > > > > > > > scales up to it's max >> > > > > > concurrency. >> > > > > > > > > > Maybe >> > > > > > > > > > > > not >> > > > > > > > > > > > > > too >> > > > > > > > > > > > > > > > bad >> > > > > > > > > > > > > > > > > > but >> > > > > > > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > > >> bit >> > > > > > > > > > > > > > > > > > > >> > > of >> > > > > > > > > > > > > > > > > > > >> > > > > pain >> > > > > > > > > > > > > > > > > > > >> > > > > > > to >> > > > > > > > > > > > > > > > > > > >> > > > > > > > manage these ids >> inside >> > > each >> > > > > > > process >> > > > > > > > > and >> > > > > > > > > > > > > across >> > > > > > > > > > > > > > > all >> > > > > > > > > > > > > > > > > > > >> application >> > > > > > > > > > > > > > > > > > > >> > > > > > > processes. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a >> separate >> > > > > producer >> > > > > > > for >> > > > > > > > > each >> > > > > > > > > > > > > > > concurrency >> > > > > > > > > > > > > > > > > > slot >> > > > > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > >> > the >> > > > > > > > > > > > > > > > > > > >> > > > > > > > application - this >> could >> > > > > create >> > > > > > a >> > > > > > > > lot >> > > > > > > > > > more >> > > > > > > > > > > > > > > producers >> > > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > >> > > resultant >> > > > > > > > > > > > > > > > > > > >> > > > > > > > connections to Kafka >> > than >> > > > the >> > > > > > > > typical >> > > > > > > > > > > model >> > > > > > > > > > > > > of a >> > > > > > > > > > > > > > > > > single >> > > > > > > > > > > > > > > > > > > >> > producer >> > > > > > > > > > > > > > > > > > > >> > > > per >> > > > > > > > > > > > > > > > > > > >> > > > > > > > process. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems >> > you're >> > > > > left >> > > > > > > with >> > > > > > > > > > > > > > > single-threaded >> > > > > > > > > > > > > > > > > > model >> > > > > > > > > > > > > > > > > > > >> per >> > > > > > > > > > > > > > > > > > > >> > > > > > > application >> > > > > > > > > > > > > > > > > > > >> > > > > > > > process? >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Roger >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 >> at >> > > > > 5:11 PM >> > > > > > > > Artem >> > > > > > > > > > > > Livshits >> > > > > > > > > > > > > > > > > > > >> > > > > > > > < >> alivsh...@confluent.io >> > > > > > .invalid> >> > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > Thank you for the >> > > > questions. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > It looks like the >> > > > > > application >> > > > > > > > must >> > > > > > > > > > > have >> > > > > > > > > > > > > > stable >> > > > > > > > > > > > > > > > > > > >> > transactional >> > > > > > > > > > > > > > > > > > > >> > > > ids >> > > > > > > > > > > > > > > > > > > >> > > > > > over >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > time? >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > The transactional >> id >> > > > should >> > > > > > > > uniquely >> > > > > > > > > > > > > identify >> > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > producer >> > > > > > > > > > > > > > > > > > > >> > > instance >> > > > > > > > > > > > > > > > > > > >> > > > > and >> > > > > > > > > > > > > > > > > > > >> > > > > > > > needs >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > to be stable across >> > the >> > > > > > > restarts. >> > > > > > > > > If >> > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > transactional >> > > > > > > > > > > > > > > > > > > >> id is >> > > > > > > > > > > > > > > > > > > >> > > not >> > > > > > > > > > > > > > > > > > > >> > > > > > > stable >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > across restarts, >> then >> > > > zombie >> > > > > > > > > messages >> > > > > > > > > > > > from a >> > > > > > > > > > > > > > > > > previous >> > > > > > > > > > > > > > > > > > > >> > > incarnation >> > > > > > > > > > > > > > > > > > > >> > > > > of >> > > > > > > > > > > > > > > > > > > >> > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > producer may >> violate >> > > > > > atomicity. >> > > > > > > > If >> > > > > > > > > > > there >> > > > > > > > > > > > > are >> > > > > > > > > > > > > > 2 >> > > > > > > > > > > > > > > > > > producer >> > > > > > > > > > > > > > > > > > > >> > > > instances >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > concurrently >> producing >> > > > data >> > > > > > with >> > > > > > > > the >> > > > > > > > > > > same >> > > > > > > > > > > > > > > > > > transactional >> > > > > > > > > > > > > > > > > > > >> id, >> > > > > > > > > > > > > > > > > > > >> > > they >> > > > > > > > > > > > > > > > > > > >> > > > > are >> > > > > > > > > > > > > > > > > > > >> > > > > > > > going >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > to constantly fence >> > each >> > > > > other >> > > > > > > and >> > > > > > > > > > most >> > > > > > > > > > > > > likely >> > > > > > > > > > > > > > > > make >> > > > > > > > > > > > > > > > > > > >> little or >> > > > > > > > > > > > > > > > > > > >> > > no >> > > > > > > > > > > > > > > > > > > >> > > > > > > > progress. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > The name might be a >> > > little >> > > > > bit >> > > > > > > > > > confusing >> > > > > > > > > > > > as >> > > > > > > > > > > > > it >> > > > > > > > > > > > > > > may >> > > > > > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > > > >> > mistaken >> > > > > > > > > > > > > > > > > > > >> > > > for >> > > > > > > > > > > > > > > > > > > >> > > > > a >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > transaction id / >> TID >> > > that >> > > > > > > uniquely >> > > > > > > > > > > > > identifies >> > > > > > > > > > > > > > > > every >> > > > > > > > > > > > > > > > > > > >> > > transaction. >> > > > > > > > > > > > > > > > > > > >> > > > > The >> > > > > > > > > > > > > > > > > > > >> > > > > > > > name >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > and the semantics >> were >> > > > > defined >> > > > > > > in >> > > > > > > > > the >> > > > > > > > > > > > > original >> > > > > > > > > > > > > > > > > > > >> > > > > exactly-once-semantics >> > > > > > > > > > > > > > > > > > > >> > > > > > > > (EoS) >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > proposal ( >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > ) >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > and KIP-939 just >> build >> > > on >> > > > > top >> > > > > > of >> > > > > > > > > that. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > I'm curious to >> > > > understand >> > > > > > what >> > > > > > > > > > happens >> > > > > > > > > > > > if >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > producer >> > > > > > > > > > > > > > > > > > > >> > dies, >> > > > > > > > > > > > > > > > > > > >> > > > and >> > > > > > > > > > > > > > > > > > > >> > > > > > does >> > > > > > > > > > > > > > > > > > > >> > > > > > > > not >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > come up and recover >> > the >> > > > > > pending >> > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > within >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > > transaction >> > > > > > > > > > > > > > > > > > > >> > > > > > > > timeout >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > interval. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > If the producer / >> > > > > application >> > > > > > > > never >> > > > > > > > > > > comes >> > > > > > > > > > > > > > back, >> > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > transaction >> > > > > > > > > > > > > > > > > > > >> > > > > will >> > > > > > > > > > > > > > > > > > > >> > > > > > > > remain >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > in prepared (a.k.a. >> > > > > > "in-doubt") >> > > > > > > > > state >> > > > > > > > > > > > until >> > > > > > > > > > > > > an >> > > > > > > > > > > > > > > > > > operator >> > > > > > > > > > > > > > > > > > > >> > > > forcefully >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > terminates the >> > > > transaction. >> > > > > > > > That's >> > > > > > > > > > why >> > > > > > > > > > > > > there >> > > > > > > > > > > > > > > is a >> > > > > > > > > > > > > > > > > new >> > > > > > > > > > > > > > > > > > > >> ACL is >> > > > > > > > > > > > > > > > > > > >> > > > > defined >> > > > > > > > > > > > > > > > > > > >> > > > > > > in >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > this proposal -- >> this >> > > > > > > > functionality >> > > > > > > > > > > should >> > > > > > > > > > > > > > only >> > > > > > > > > > > > > > > > > > provided >> > > > > > > > > > > > > > > > > > > >> to >> > > > > > > > > > > > > > > > > > > >> > > > > > > applications >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > that implement >> proper >> > > > > recovery >> > > > > > > > > logic. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > -Artem >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22, >> 2023 >> > at >> > > > > > 12:52 AM >> > > > > > > > > Arjun >> > > > > > > > > > > > > Satish >> > > > > > > > > > > > > > < >> > > > > > > > > > > > > > > > > > > >> > > > > > arjun.sat...@gmail.com> >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Hello Artem, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the >> KIP. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > I have the same >> > > question >> > > > > as >> > > > > > > > Roger >> > > > > > > > > on >> > > > > > > > > > > > > > > concurrent >> > > > > > > > > > > > > > > > > > > writes, >> > > > > > > > > > > > > > > > > > > >> and >> > > > > > > > > > > > > > > > > > > >> > > an >> > > > > > > > > > > > > > > > > > > >> > > > > > > > additional >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > one on consumer >> > > > behavior. >> > > > > > > > > Typically, >> > > > > > > > > > > > > > > > transactions >> > > > > > > > > > > > > > > > > > will >> > > > > > > > > > > > > > > > > > > >> > > timeout >> > > > > > > > > > > > > > > > > > > >> > > > if >> > > > > > > > > > > > > > > > > > > >> > > > > > not >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > committed within >> > some >> > > > time >> > > > > > > > > interval. >> > > > > > > > > > > > With >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > proposed >> > > > > > > > > > > > > > > > > > > >> > > changes >> > > > > > > > > > > > > > > > > > > >> > > > in >> > > > > > > > > > > > > > > > > > > >> > > > > > > this >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > KIP, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > consumers cannot >> > > consume >> > > > > > past >> > > > > > > > the >> > > > > > > > > > > > ongoing >> > > > > > > > > > > > > > > > > > transaction. >> > > > > > > > > > > > > > > > > > > >> I'm >> > > > > > > > > > > > > > > > > > > >> > > > > curious >> > > > > > > > > > > > > > > > > > > >> > > > > > to >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > understand what >> > > happens >> > > > if >> > > > > > the >> > > > > > > > > > > producer >> > > > > > > > > > > > > > dies, >> > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > does >> > > > > > > > > > > > > > > > > > > >> not >> > > > > > > > > > > > > > > > > > > >> > > come >> > > > > > > > > > > > > > > > > > > >> > > > > up >> > > > > > > > > > > > > > > > > > > >> > > > > > > and >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > recover the >> pending >> > > > > > > transaction >> > > > > > > > > > within >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > > > > >> > > timeout >> > > > > > > > > > > > > > > > > > > >> > > > > > > > interval. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > Or >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > are we saying >> that >> > > when >> > > > > used >> > > > > > > in >> > > > > > > > > this >> > > > > > > > > > > 2PC >> > > > > > > > > > > > > > > > context, >> > > > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > >> should >> > > > > > > > > > > > > > > > > > > >> > > > > > configure >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > these >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > transaction >> timeouts >> > > to >> > > > > very >> > > > > > > > large >> > > > > > > > > > > > > > durations? >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks in >> advance! >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Arjun >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, >> 2023 >> > > at >> > > > > > > 1:06 PM >> > > > > > > > > > Roger >> > > > > > > > > > > > > > Hoover < >> > > > > > > > > > > > > > > > > > > >> > > > > > roger.hoo...@gmail.com >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Hi Artem, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for >> writing >> > > > this >> > > > > > KIP. >> > > > > > > > > Can >> > > > > > > > > > > you >> > > > > > > > > > > > > > > clarify >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > > requirements >> > > > > > > > > > > > > > > > > > > >> > > > > a >> > > > > > > > > > > > > > > > > > > >> > > > > > > bit >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > more >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > for managing >> > > > transaction >> > > > > > > > state? >> > > > > > > > > > It >> > > > > > > > > > > > > looks >> > > > > > > > > > > > > > > like >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > > application >> > > > > > > > > > > > > > > > > > > >> > > > > > must >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > have >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > stable >> > transactional >> > > > ids >> > > > > > > over >> > > > > > > > > > time? >> > > > > > > > > > > > > What >> > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > granularity >> > > > > > > > > > > > > > > > > > > >> > > > > of >> > > > > > > > > > > > > > > > > > > >> > > > > > > > those >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > ids >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > and producers? >> > Say >> > > > the >> > > > > > > > > > application >> > > > > > > > > > > > is a >> > > > > > > > > > > > > > > > > > > >> multi-threaded >> > > > > > > > > > > > > > > > > > > >> > > Java >> > > > > > > > > > > > > > > > > > > >> > > > > web >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > server, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > can/should all >> the >> > > > > > > concurrent >> > > > > > > > > > > threads >> > > > > > > > > > > > > > share >> > > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > > >> > transactional >> > > > > > > > > > > > > > > > > > > >> > > > id >> > > > > > > > > > > > > > > > > > > >> > > > > > and >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > producer? That >> > > > doesn't >> > > > > > seem >> > > > > > > > > right >> > > > > > > > > > > to >> > > > > > > > > > > > me >> > > > > > > > > > > > > > > > unless >> > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > > application >> > > > > > > > > > > > > > > > > > > >> > > > > > is >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > using >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > global DB locks >> > that >> > > > > > > serialize >> > > > > > > > > all >> > > > > > > > > > > > > > requests. >> > > > > > > > > > > > > > > > > > > >> Instead, if >> > > > > > > > > > > > > > > > > > > >> > > the >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > application >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > uses row-level >> DB >> > > > locks, >> > > > > > > there >> > > > > > > > > > could >> > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > multiple, >> > > > > > > > > > > > > > > > > > > >> > > concurrent, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > independent >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > txns happening >> in >> > > the >> > > > > same >> > > > > > > JVM >> > > > > > > > > so >> > > > > > > > > > it >> > > > > > > > > > > > > seems >> > > > > > > > > > > > > > > > like >> > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > > granularity >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > managing >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > transactional >> ids >> > > and >> > > > > txn >> > > > > > > > state >> > > > > > > > > > > needs >> > > > > > > > > > > > to >> > > > > > > > > > > > > > > line >> > > > > > > > > > > > > > > > up >> > > > > > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > > >> > > > > granularity >> > > > > > > > > > > > > > > > > > > >> > > > > > > of >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > the >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > DB >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > locking. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Does that make >> > sense >> > > > or >> > > > > > am I >> > > > > > > > > > > > > > > misunderstanding? >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Roger >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, >> > 2023 >> > > > at >> > > > > > > > 11:40 PM >> > > > > > > > > > > Artem >> > > > > > > > > > > > > > > > Livshits >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > < >> > > > alivsh...@confluent.io >> > > > > > > > > .invalid> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hello, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > This is a >> > > discussion >> > > > > > > thread >> > > > > > > > > for >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > . >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The KIP >> proposes >> > > > > > extending >> > > > > > > > > Kafka >> > > > > > > > > > > > > > > transaction >> > > > > > > > > > > > > > > > > > > support >> > > > > > > > > > > > > > > > > > > >> > > (that >> > > > > > > > > > > > > > > > > > > >> > > > > > > already >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > uses >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2PC >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > under the >> hood) >> > to >> > > > > > enable >> > > > > > > > > > > atomicity >> > > > > > > > > > > > of >> > > > > > > > > > > > > > > dual >> > > > > > > > > > > > > > > > > > writes >> > > > > > > > > > > > > > > > > > > >> to >> > > > > > > > > > > > > > > > > > > >> > > Kafka >> > > > > > > > > > > > > > > > > > > >> > > > > and >> > > > > > > > > > > > > > > > > > > >> > > > > > > an >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > external >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > database, and >> > > helps >> > > > to >> > > > > > > fix a >> > > > > > > > > > long >> > > > > > > > > > > > > > standing >> > > > > > > > > > > > > > > > > Flink >> > > > > > > > > > > > > > > > > > > >> issue. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > An example of >> > code >> > > > > that >> > > > > > > uses >> > > > > > > > > the >> > > > > > > > > > > > dual >> > > > > > > > > > > > > > > write >> > > > > > > > > > > > > > > > > > recipe >> > > > > > > > > > > > > > > > > > > >> with >> > > > > > > > > > > > > > > > > > > >> > > > JDBC >> > > > > > > > > > > > > > > > > > > >> > > > > > and >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > should >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > work for most >> > SQL >> > > > > > > databases >> > > > > > > > is >> > > > > > > > > > > here >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for >> the >> > > > > sister >> > > > > > > fix >> > > > > > > > in >> > > > > > > > > > > Flink >> > > > > > > > > > > > > is >> > > > > > > > > > > > > > > here >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > -Artem >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >