Hi Justine,

Thank you for the questions.  Currently (pre-KIP-939) we always bump the
epoch on InitProducerId and abort an ongoing transaction (if any).  I
expect this behavior will continue with KIP-890 as well.

With KIP-939 we need to support the case when the ongoing transaction needs
to be preserved when keepPreparedTxn=true.  Bumping epoch without aborting
or committing a transaction is tricky because epoch is a short value and
it's easy to overflow.  Currently, the overflow case is handled by aborting
the ongoing transaction, which would send out transaction markers with
epoch=Short.MAX_VALUE to the partition leaders, which would fence off any
messages with the producer id that started the transaction (they would have
epoch that is less than Short.MAX_VALUE).  Then it is safe to allocate a
new producer id and use it in new transactions.

We could say that maybe when keepPreparedTxn=true we bump epoch unless it
leads to overflow, and don't bump epoch in the overflow case.  I don't
think it's a good solution because if it's not safe to keep the same epoch
when keepPreparedTxn=true, then we must handle the epoch overflow case as
well.  So either we should convince ourselves that it's safe to keep the
epoch and do it in the general case, or we always bump the epoch and handle
the overflow.

With KIP-890, we bump the epoch on every transaction commit / abort.  This
guarantees that even if InitProducerId(keepPreparedTxn=true) doesn't
increment epoch on the ongoing transaction, the client will have to call
commit or abort to finish the transaction and will increment the epoch (and
handle epoch overflow, if needed).  If the ongoing transaction was in a bad
state and had some zombies waiting to arrive, the abort operation would
fence them because with KIP-890 every abort would bump the epoch.

We could also look at this from the following perspective.  With KIP-890,
zombies won't be able to cross transaction boundaries; each transaction
completion creates a boundary and any activity in the past gets confined in
the boundary.  Then data in any partition would look like this:

1. message1, epoch=42
2. message2, epoch=42
3. message3, epoch=42
4. marker (commit or abort), epoch=43

Now if we inject steps 3a and 3b like this:

1. message1, epoch=42
2. message2, epoch=42
3. message3, epoch=42
3a. crash
3b. InitProducerId(keepPreparedTxn=true)
4. marker (commit or abort), epoch=43

The invariant still holds even with steps 3a and 3b -- whatever activity
was in the past will get confined in the past with mandatory abort / commit
that must follow  InitProducerId(keepPreparedTxn=true).

So KIP-890 provides the proper isolation between transactions, so injecting
crash + InitProducerId(keepPreparedTxn=true) into the transaction sequence
is safe from the zombie protection perspective.

That said, I'm still thinking about it and looking for cases that might
break because we don't bump epoch when
InitProducerId(keepPreparedTxn=true), if such cases exist, we'll need to
develop the logic to handle epoch overflow for ongoing transactions.

-Artem



On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan <jols...@confluent.io.invalid>
wrote:

> Hey Artem,
>
> Thanks for the KIP. I had a question about epoch bumping.
>
> Previously when we send an InitProducerId request on Producer startup, we
> bump the epoch and abort the transaction. Is it correct to assume that we
> will still bump the epoch, but just not abort the transaction?
> If we still bump the epoch in this case, how does this interact with
> KIP-890 where we also bump the epoch on every transaction. (I think this
> means that we may skip epochs and the data itself will all have the same
> epoch)
>
> I may have follow ups depending on the answer to this. :)
>
> Thanks,
> Justine
>
> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Alex,
> >
> > Thank you for your questions.
> >
> > > the purpose of having broker-level transaction.two.phase.commit.enable
> >
> > The thinking is that 2PC is a bit of an advanced construct so enabling
> 2PC
> > in a Kafka cluster should be an explicit decision.  If it is set to
> 'false'
> > InitiProducerId (and initTransactions) would
> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> >
> > > WDYT about adding an AdminClient method that returns the state of
> > transaction.two.phase.commit.enable
> >
> > I wonder if the client could just try to use 2PC and then handle the
> error
> > (e.g. if it needs to fall back to ordinary transactions).  This way it
> > could uniformly handle cases when Kafka cluster doesn't support 2PC
> > completely and cases when 2PC is restricted to certain users.  We could
> > also expose this config in describeConfigs, if the fallback approach
> > doesn't work for some scenarios.
> >
> > -Artem
> >
> >
> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
> > <asorokou...@confluent.io.invalid> wrote:
> >
> > > Hi Artem,
> > >
> > > Thanks for publishing this KIP!
> > >
> > > Can you please clarify the purpose of having broker-level
> > > transaction.two.phase.commit.enable config in addition to the new ACL?
> If
> > > the brokers are configured with
> > transaction.two.phase.commit.enable=false,
> > > at what point will a client configured with
> > > transaction.two.phase.commit.enable=true fail? Will it happen at
> > > KafkaProducer#initTransactions?
> > >
> > > WDYT about adding an AdminClient method that returns the state of t
> > > ransaction.two.phase.commit.enable? This way, clients would know in
> > advance
> > > if 2PC is enabled on the brokers.
> > >
> > > Best,
> > > Alex
> > >
> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <roger.hoo...@gmail.com>
> > > wrote:
> > >
> > > > Other than supporting multiplexing transactional streams on a single
> > > > producer, I don't see how to improve it.
> > > >
> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Roger,
> > > > >
> > > > > Thank you for summarizing the cons.  I agree and I'm curious what
> > would
> > > > be
> > > > > the alternatives to solve these problems better and if they can be
> > > > > incorporated into this proposal (or built independently in addition
> > to
> > > or
> > > > > on top of this proposal).  E.g. one potential extension we
> discussed
> > > > > earlier in the thread could be multiplexing logical transactional
> > > > "streams"
> > > > > with a single producer.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <
> roger.hoo...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Thanks.  I like that you're moving Kafka toward supporting this
> > > > > dual-write
> > > > > > pattern.  Each use case needs to consider the tradeoffs.  You
> > already
> > > > > > summarized the pros very well in the KIP.  I would summarize the
> > cons
> > > > > > as follows:
> > > > > >
> > > > > > - you sacrifice availability - each write requires both DB and
> > Kafka
> > > to
> > > > > be
> > > > > > available so I think your overall application availability is 1 -
> > > p(DB
> > > > is
> > > > > > unavailable)*p(Kafka is unavailable).
> > > > > > - latency will be higher and throughput lower - each write
> requires
> > > > both
> > > > > > writes to DB and Kafka while holding an exclusive lock in DB.
> > > > > > - you need to create a producer per unit of concurrency in your
> app
> > > > which
> > > > > > has some overhead in the app and Kafka side (number of
> connections,
> > > > poor
> > > > > > batching).  I assume the producers would need to be configured
> for
> > > low
> > > > > > latency (linger.ms=0)
> > > > > > - there's some complexity in managing stable transactional ids
> for
> > > each
> > > > > > producer/concurrency unit in your application.  With k8s
> > deployment,
> > > > you
> > > > > > may need to switch to something like a StatefulSet that gives
> each
> > > pod
> > > > a
> > > > > > stable identity across restarts.  On top of that pod identity
> which
> > > you
> > > > > can
> > > > > > use as a prefix, you then assign unique transactional ids to each
> > > > > > concurrency unit (thread/goroutine).
> > > > > >
> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Roger,
> > > > > > >
> > > > > > > Thank you for the feedback.  You make a very good point that we
> > > also
> > > > > > > discussed internally.  Adding support for multiple concurrent
> > > > > > > transactions in one producer could be valuable but it seems to
> > be a
> > > > > > fairly
> > > > > > > large and independent change that would deserve a separate KIP.
> > If
> > > > > such
> > > > > > > support is added we could modify 2PC functionality to
> incorporate
> > > > that.
> > > > > > >
> > > > > > > > Maybe not too bad but a bit of pain to manage these ids
> inside
> > > each
> > > > > > > process and across all application processes.
> > > > > > >
> > > > > > > I'm not sure if supporting multiple transactions in one
> producer
> > > > would
> > > > > > make
> > > > > > > id management simpler: we'd need to store a piece of data per
> > > > > > transaction,
> > > > > > > so whether it's N producers with a single transaction or N
> > > > transactions
> > > > > > > with a single producer, it's still roughly the same amount of
> > data
> > > to
> > > > > > > manage.  In fact, managing transactional ids (current proposal)
> > > might
> > > > > be
> > > > > > > easier, because the id is controlled by the application and it
> > > knows
> > > > > how
> > > > > > to
> > > > > > > complete the transaction after crash / restart; while a TID
> would
> > > be
> > > > > > > generated by Kafka and that would create a question of starting
> > > Kafka
> > > > > > > transaction, but not saving its TID and then crashing, then
> > > figuring
> > > > > out
> > > > > > > which transactions to abort and etc.
> > > > > > >
> > > > > > > > 2) creating a separate producer for each concurrency slot in
> > the
> > > > > > > application
> > > > > > >
> > > > > > > This is a very valid concern.  Maybe we'd need to have some
> > > > > multiplexing
> > > > > > of
> > > > > > > transactional logical "streams" over the same connection.
> Seems
> > > > like a
> > > > > > > separate KIP, though.
> > > > > > >
> > > > > > > > Otherwise, it seems you're left with single-threaded model
> per
> > > > > > > application process?
> > > > > > >
> > > > > > > That's a fair assessment.  Not necessarily exactly
> > single-threaded
> > > > per
> > > > > > > application, but a single producer per thread model (i.e. an
> > > > > application
> > > > > > > could have a pool of threads + producers to increase
> > concurrency).
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <
> > > roger.hoo...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Artem,
> > > > > > > >
> > > > > > > > Thanks for the reply.
> > > > > > > >
> > > > > > > > If I understand correctly, Kafka does not support concurrent
> > > > > > transactions
> > > > > > > > from the same producer (transactional id).  I think this
> means
> > > that
> > > > > > > > applications that want to support in-process concurrency (say
> > > > > > > thread-level
> > > > > > > > concurrency with row-level DB locking) would need to manage
> > > > separate
> > > > > > > > transactional ids and producers per thread and then store txn
> > > state
> > > > > > > > accordingly.   The potential usability downsides I see are
> > > > > > > > 1) managing a set of transactional ids for each application
> > > process
> > > > > > that
> > > > > > > > scales up to it's max concurrency.  Maybe not too bad but a
> bit
> > > of
> > > > > pain
> > > > > > > to
> > > > > > > > manage these ids inside each process and across all
> application
> > > > > > > processes.
> > > > > > > > 2) creating a separate producer for each concurrency slot in
> > the
> > > > > > > > application - this could create a lot more producers and
> > > resultant
> > > > > > > > connections to Kafka than the typical model of a single
> > producer
> > > > per
> > > > > > > > process.
> > > > > > > >
> > > > > > > > Otherwise, it seems you're left with single-threaded model
> per
> > > > > > > application
> > > > > > > > process?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Roger
> > > > > > > >
> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hi Roger, Arjun,
> > > > > > > > >
> > > > > > > > > Thank you for the questions.
> > > > > > > > > > It looks like the application must have stable
> > transactional
> > > > ids
> > > > > > over
> > > > > > > > > time?
> > > > > > > > >
> > > > > > > > > The transactional id should uniquely identify a producer
> > > instance
> > > > > and
> > > > > > > > needs
> > > > > > > > > to be stable across the restarts.  If the transactional id
> is
> > > not
> > > > > > > stable
> > > > > > > > > across restarts, then zombie messages from a previous
> > > incarnation
> > > > > of
> > > > > > > the
> > > > > > > > > producer may violate atomicity.  If there are 2 producer
> > > > instances
> > > > > > > > > concurrently producing data with the same transactional id,
> > > they
> > > > > are
> > > > > > > > going
> > > > > > > > > to constantly fence each other and most likely make little
> or
> > > no
> > > > > > > > progress.
> > > > > > > > >
> > > > > > > > > The name might be a little bit confusing as it may be
> > mistaken
> > > > for
> > > > > a
> > > > > > > > > transaction id / TID that uniquely identifies every
> > > transaction.
> > > > > The
> > > > > > > > name
> > > > > > > > > and the semantics were defined in the original
> > > > > exactly-once-semantics
> > > > > > > > (EoS)
> > > > > > > > > proposal (
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > )
> > > > > > > > > and KIP-939 just build on top of that.
> > > > > > > > >
> > > > > > > > > > I'm curious to understand what happens if the producer
> > dies,
> > > > and
> > > > > > does
> > > > > > > > not
> > > > > > > > > come up and recover the pending transaction within the
> > > > transaction
> > > > > > > > timeout
> > > > > > > > > interval.
> > > > > > > > >
> > > > > > > > > If the producer / application never comes back, the
> > transaction
> > > > > will
> > > > > > > > remain
> > > > > > > > > in prepared (a.k.a. "in-doubt") state until an operator
> > > > forcefully
> > > > > > > > > terminates the transaction.  That's why there is a new ACL
> is
> > > > > defined
> > > > > > > in
> > > > > > > > > this proposal -- this functionality should only provided to
> > > > > > > applications
> > > > > > > > > that implement proper recovery logic.
> > > > > > > > >
> > > > > > > > > -Artem
> > > > > > > > >
> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish <
> > > > > > arjun.sat...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello Artem,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP.
> > > > > > > > > >
> > > > > > > > > > I have the same question as Roger on concurrent writes,
> and
> > > an
> > > > > > > > additional
> > > > > > > > > > one on consumer behavior. Typically, transactions will
> > > timeout
> > > > if
> > > > > > not
> > > > > > > > > > committed within some time interval. With the proposed
> > > changes
> > > > in
> > > > > > > this
> > > > > > > > > KIP,
> > > > > > > > > > consumers cannot consume past the ongoing transaction.
> I'm
> > > > > curious
> > > > > > to
> > > > > > > > > > understand what happens if the producer dies, and does
> not
> > > come
> > > > > up
> > > > > > > and
> > > > > > > > > > recover the pending transaction within the transaction
> > > timeout
> > > > > > > > interval.
> > > > > > > > > Or
> > > > > > > > > > are we saying that when used in this 2PC context, we
> should
> > > > > > configure
> > > > > > > > > these
> > > > > > > > > > transaction timeouts to very large durations?
> > > > > > > > > >
> > > > > > > > > > Thanks in advance!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Arjun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover <
> > > > > > roger.hoo...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Artem,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for writing this KIP.  Can you clarify the
> > > > requirements
> > > > > a
> > > > > > > bit
> > > > > > > > > more
> > > > > > > > > > > for managing transaction state?  It looks like the
> > > > application
> > > > > > must
> > > > > > > > > have
> > > > > > > > > > > stable transactional ids over time?   What is the
> > > granularity
> > > > > of
> > > > > > > > those
> > > > > > > > > > ids
> > > > > > > > > > > and producers?  Say the application is a multi-threaded
> > > Java
> > > > > web
> > > > > > > > > server,
> > > > > > > > > > > can/should all the concurrent threads share a
> > transactional
> > > > id
> > > > > > and
> > > > > > > > > > > producer?  That doesn't seem right to me unless the
> > > > application
> > > > > > is
> > > > > > > > > using
> > > > > > > > > > > global DB locks that serialize all requests.  Instead,
> if
> > > the
> > > > > > > > > application
> > > > > > > > > > > uses row-level DB locks, there could be multiple,
> > > concurrent,
> > > > > > > > > independent
> > > > > > > > > > > txns happening in the same JVM so it seems like the
> > > > granularity
> > > > > > > > > managing
> > > > > > > > > > > transactional ids and txn state needs to line up with
> > > > > granularity
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > DB
> > > > > > > > > > > locking.
> > > > > > > > > > >
> > > > > > > > > > > Does that make sense or am I misunderstanding?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Roger
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hello,
> > > > > > > > > > > >
> > > > > > > > > > > > This is a discussion thread for
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > > > > > > .
> > > > > > > > > > > >
> > > > > > > > > > > > The KIP proposes extending Kafka transaction support
> > > (that
> > > > > > > already
> > > > > > > > > uses
> > > > > > > > > > > 2PC
> > > > > > > > > > > > under the hood) to enable atomicity of dual writes to
> > > Kafka
> > > > > and
> > > > > > > an
> > > > > > > > > > > external
> > > > > > > > > > > > database, and helps to fix a long standing Flink
> issue.
> > > > > > > > > > > >
> > > > > > > > > > > > An example of code that uses the dual write recipe
> with
> > > > JDBC
> > > > > > and
> > > > > > > > > should
> > > > > > > > > > > > work for most SQL databases is here
> > > > > > > > > > > > https://github.com/apache/kafka/pull/14231.
> > > > > > > > > > > >
> > > > > > > > > > > > The FLIP for the sister fix in Flink is here
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > > > > > >
> > > > > > > > > > > > -Artem
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to