Hi Alex,

Thank you for your questions.

> the purpose of having broker-level transaction.two.phase.commit.enable

The thinking is that 2PC is a bit of an advanced construct so enabling 2PC
in a Kafka cluster should be an explicit decision.  If it is set to 'false'
InitiProducerId (and initTransactions) would
return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.

> WDYT about adding an AdminClient method that returns the state of
transaction.two.phase.commit.enable

I wonder if the client could just try to use 2PC and then handle the error
(e.g. if it needs to fall back to ordinary transactions).  This way it
could uniformly handle cases when Kafka cluster doesn't support 2PC
completely and cases when 2PC is restricted to certain users.  We could
also expose this config in describeConfigs, if the fallback approach
doesn't work for some scenarios.

-Artem


On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

> Hi Artem,
>
> Thanks for publishing this KIP!
>
> Can you please clarify the purpose of having broker-level
> transaction.two.phase.commit.enable config in addition to the new ACL? If
> the brokers are configured with transaction.two.phase.commit.enable=false,
> at what point will a client configured with
> transaction.two.phase.commit.enable=true fail? Will it happen at
> KafkaProducer#initTransactions?
>
> WDYT about adding an AdminClient method that returns the state of t
> ransaction.two.phase.commit.enable? This way, clients would know in advance
> if 2PC is enabled on the brokers.
>
> Best,
> Alex
>
> On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Other than supporting multiplexing transactional streams on a single
> > producer, I don't see how to improve it.
> >
> > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > Hi Roger,
> > >
> > > Thank you for summarizing the cons.  I agree and I'm curious what would
> > be
> > > the alternatives to solve these problems better and if they can be
> > > incorporated into this proposal (or built independently in addition to
> or
> > > on top of this proposal).  E.g. one potential extension we discussed
> > > earlier in the thread could be multiplexing logical transactional
> > "streams"
> > > with a single producer.
> > >
> > > -Artem
> > >
> > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <roger.hoo...@gmail.com>
> > > wrote:
> > >
> > > > Thanks.  I like that you're moving Kafka toward supporting this
> > > dual-write
> > > > pattern.  Each use case needs to consider the tradeoffs.  You already
> > > > summarized the pros very well in the KIP.  I would summarize the cons
> > > > as follows:
> > > >
> > > > - you sacrifice availability - each write requires both DB and Kafka
> to
> > > be
> > > > available so I think your overall application availability is 1 -
> p(DB
> > is
> > > > unavailable)*p(Kafka is unavailable).
> > > > - latency will be higher and throughput lower - each write requires
> > both
> > > > writes to DB and Kafka while holding an exclusive lock in DB.
> > > > - you need to create a producer per unit of concurrency in your app
> > which
> > > > has some overhead in the app and Kafka side (number of connections,
> > poor
> > > > batching).  I assume the producers would need to be configured for
> low
> > > > latency (linger.ms=0)
> > > > - there's some complexity in managing stable transactional ids for
> each
> > > > producer/concurrency unit in your application.  With k8s deployment,
> > you
> > > > may need to switch to something like a StatefulSet that gives each
> pod
> > a
> > > > stable identity across restarts.  On top of that pod identity which
> you
> > > can
> > > > use as a prefix, you then assign unique transactional ids to each
> > > > concurrency unit (thread/goroutine).
> > > >
> > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Roger,
> > > > >
> > > > > Thank you for the feedback.  You make a very good point that we
> also
> > > > > discussed internally.  Adding support for multiple concurrent
> > > > > transactions in one producer could be valuable but it seems to be a
> > > > fairly
> > > > > large and independent change that would deserve a separate KIP.  If
> > > such
> > > > > support is added we could modify 2PC functionality to incorporate
> > that.
> > > > >
> > > > > > Maybe not too bad but a bit of pain to manage these ids inside
> each
> > > > > process and across all application processes.
> > > > >
> > > > > I'm not sure if supporting multiple transactions in one producer
> > would
> > > > make
> > > > > id management simpler: we'd need to store a piece of data per
> > > > transaction,
> > > > > so whether it's N producers with a single transaction or N
> > transactions
> > > > > with a single producer, it's still roughly the same amount of data
> to
> > > > > manage.  In fact, managing transactional ids (current proposal)
> might
> > > be
> > > > > easier, because the id is controlled by the application and it
> knows
> > > how
> > > > to
> > > > > complete the transaction after crash / restart; while a TID would
> be
> > > > > generated by Kafka and that would create a question of starting
> Kafka
> > > > > transaction, but not saving its TID and then crashing, then
> figuring
> > > out
> > > > > which transactions to abort and etc.
> > > > >
> > > > > > 2) creating a separate producer for each concurrency slot in the
> > > > > application
> > > > >
> > > > > This is a very valid concern.  Maybe we'd need to have some
> > > multiplexing
> > > > of
> > > > > transactional logical "streams" over the same connection.  Seems
> > like a
> > > > > separate KIP, though.
> > > > >
> > > > > > Otherwise, it seems you're left with single-threaded model per
> > > > > application process?
> > > > >
> > > > > That's a fair assessment.  Not necessarily exactly single-threaded
> > per
> > > > > application, but a single producer per thread model (i.e. an
> > > application
> > > > > could have a pool of threads + producers to increase concurrency).
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <
> roger.hoo...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Artem,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > If I understand correctly, Kafka does not support concurrent
> > > > transactions
> > > > > > from the same producer (transactional id).  I think this means
> that
> > > > > > applications that want to support in-process concurrency (say
> > > > > thread-level
> > > > > > concurrency with row-level DB locking) would need to manage
> > separate
> > > > > > transactional ids and producers per thread and then store txn
> state
> > > > > > accordingly.   The potential usability downsides I see are
> > > > > > 1) managing a set of transactional ids for each application
> process
> > > > that
> > > > > > scales up to it's max concurrency.  Maybe not too bad but a bit
> of
> > > pain
> > > > > to
> > > > > > manage these ids inside each process and across all application
> > > > > processes.
> > > > > > 2) creating a separate producer for each concurrency slot in the
> > > > > > application - this could create a lot more producers and
> resultant
> > > > > > connections to Kafka than the typical model of a single producer
> > per
> > > > > > process.
> > > > > >
> > > > > > Otherwise, it seems you're left with single-threaded model per
> > > > > application
> > > > > > process?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Roger
> > > > > >
> > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Roger, Arjun,
> > > > > > >
> > > > > > > Thank you for the questions.
> > > > > > > > It looks like the application must have stable transactional
> > ids
> > > > over
> > > > > > > time?
> > > > > > >
> > > > > > > The transactional id should uniquely identify a producer
> instance
> > > and
> > > > > > needs
> > > > > > > to be stable across the restarts.  If the transactional id is
> not
> > > > > stable
> > > > > > > across restarts, then zombie messages from a previous
> incarnation
> > > of
> > > > > the
> > > > > > > producer may violate atomicity.  If there are 2 producer
> > instances
> > > > > > > concurrently producing data with the same transactional id,
> they
> > > are
> > > > > > going
> > > > > > > to constantly fence each other and most likely make little or
> no
> > > > > > progress.
> > > > > > >
> > > > > > > The name might be a little bit confusing as it may be mistaken
> > for
> > > a
> > > > > > > transaction id / TID that uniquely identifies every
> transaction.
> > > The
> > > > > > name
> > > > > > > and the semantics were defined in the original
> > > exactly-once-semantics
> > > > > > (EoS)
> > > > > > > proposal (
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > )
> > > > > > > and KIP-939 just build on top of that.
> > > > > > >
> > > > > > > > I'm curious to understand what happens if the producer dies,
> > and
> > > > does
> > > > > > not
> > > > > > > come up and recover the pending transaction within the
> > transaction
> > > > > > timeout
> > > > > > > interval.
> > > > > > >
> > > > > > > If the producer / application never comes back, the transaction
> > > will
> > > > > > remain
> > > > > > > in prepared (a.k.a. "in-doubt") state until an operator
> > forcefully
> > > > > > > terminates the transaction.  That's why there is a new ACL is
> > > defined
> > > > > in
> > > > > > > this proposal -- this functionality should only provided to
> > > > > applications
> > > > > > > that implement proper recovery logic.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish <
> > > > arjun.sat...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Artem,
> > > > > > > >
> > > > > > > > Thanks for the KIP.
> > > > > > > >
> > > > > > > > I have the same question as Roger on concurrent writes, and
> an
> > > > > > additional
> > > > > > > > one on consumer behavior. Typically, transactions will
> timeout
> > if
> > > > not
> > > > > > > > committed within some time interval. With the proposed
> changes
> > in
> > > > > this
> > > > > > > KIP,
> > > > > > > > consumers cannot consume past the ongoing transaction. I'm
> > > curious
> > > > to
> > > > > > > > understand what happens if the producer dies, and does not
> come
> > > up
> > > > > and
> > > > > > > > recover the pending transaction within the transaction
> timeout
> > > > > > interval.
> > > > > > > Or
> > > > > > > > are we saying that when used in this 2PC context, we should
> > > > configure
> > > > > > > these
> > > > > > > > transaction timeouts to very large durations?
> > > > > > > >
> > > > > > > > Thanks in advance!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Arjun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover <
> > > > roger.hoo...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Artem,
> > > > > > > > >
> > > > > > > > > Thanks for writing this KIP.  Can you clarify the
> > requirements
> > > a
> > > > > bit
> > > > > > > more
> > > > > > > > > for managing transaction state?  It looks like the
> > application
> > > > must
> > > > > > > have
> > > > > > > > > stable transactional ids over time?   What is the
> granularity
> > > of
> > > > > > those
> > > > > > > > ids
> > > > > > > > > and producers?  Say the application is a multi-threaded
> Java
> > > web
> > > > > > > server,
> > > > > > > > > can/should all the concurrent threads share a transactional
> > id
> > > > and
> > > > > > > > > producer?  That doesn't seem right to me unless the
> > application
> > > > is
> > > > > > > using
> > > > > > > > > global DB locks that serialize all requests.  Instead, if
> the
> > > > > > > application
> > > > > > > > > uses row-level DB locks, there could be multiple,
> concurrent,
> > > > > > > independent
> > > > > > > > > txns happening in the same JVM so it seems like the
> > granularity
> > > > > > > managing
> > > > > > > > > transactional ids and txn state needs to line up with
> > > granularity
> > > > > of
> > > > > > > the
> > > > > > > > DB
> > > > > > > > > locking.
> > > > > > > > >
> > > > > > > > > Does that make sense or am I misunderstanding?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Roger
> > > > > > > > >
> > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > Hello,
> > > > > > > > > >
> > > > > > > > > > This is a discussion thread for
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > > > > .
> > > > > > > > > >
> > > > > > > > > > The KIP proposes extending Kafka transaction support
> (that
> > > > > already
> > > > > > > uses
> > > > > > > > > 2PC
> > > > > > > > > > under the hood) to enable atomicity of dual writes to
> Kafka
> > > and
> > > > > an
> > > > > > > > > external
> > > > > > > > > > database, and helps to fix a long standing Flink issue.
> > > > > > > > > >
> > > > > > > > > > An example of code that uses the dual write recipe with
> > JDBC
> > > > and
> > > > > > > should
> > > > > > > > > > work for most SQL databases is here
> > > > > > > > > > https://github.com/apache/kafka/pull/14231.
> > > > > > > > > >
> > > > > > > > > > The FLIP for the sister fix in Flink is here
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > > > >
> > > > > > > > > > -Artem
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to