Hi Roger,

Thank you for the feedback.  You make a very good point that we also
discussed internally.  Adding support for multiple concurrent
transactions in one producer could be valuable but it seems to be a fairly
large and independent change that would deserve a separate KIP.  If such
support is added we could modify 2PC functionality to incorporate that.

> Maybe not too bad but a bit of pain to manage these ids inside each
process and across all application processes.

I'm not sure if supporting multiple transactions in one producer would make
id management simpler: we'd need to store a piece of data per transaction,
so whether it's N producers with a single transaction or N transactions
with a single producer, it's still roughly the same amount of data to
manage.  In fact, managing transactional ids (current proposal) might be
easier, because the id is controlled by the application and it knows how to
complete the transaction after crash / restart; while a TID would be
generated by Kafka and that would create a question of starting Kafka
transaction, but not saving its TID and then crashing, then figuring out
which transactions to abort and etc.

> 2) creating a separate producer for each concurrency slot in the
application

This is a very valid concern.  Maybe we'd need to have some multiplexing of
transactional logical "streams" over the same connection.  Seems like a
separate KIP, though.

> Otherwise, it seems you're left with single-threaded model per
application process?

That's a fair assessment.  Not necessarily exactly single-threaded per
application, but a single producer per thread model (i.e. an application
could have a pool of threads + producers to increase concurrency).

-Artem

On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <roger.hoo...@gmail.com> wrote:

> Artem,
>
> Thanks for the reply.
>
> If I understand correctly, Kafka does not support concurrent transactions
> from the same producer (transactional id).  I think this means that
> applications that want to support in-process concurrency (say thread-level
> concurrency with row-level DB locking) would need to manage separate
> transactional ids and producers per thread and then store txn state
> accordingly.   The potential usability downsides I see are
> 1) managing a set of transactional ids for each application process that
> scales up to it's max concurrency.  Maybe not too bad but a bit of pain to
> manage these ids inside each process and across all application processes.
> 2) creating a separate producer for each concurrency slot in the
> application - this could create a lot more producers and resultant
> connections to Kafka than the typical model of a single producer per
> process.
>
> Otherwise, it seems you're left with single-threaded model per application
> process?
>
> Thanks,
>
> Roger
>
> On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Roger, Arjun,
> >
> > Thank you for the questions.
> > > It looks like the application must have stable transactional ids over
> > time?
> >
> > The transactional id should uniquely identify a producer instance and
> needs
> > to be stable across the restarts.  If the transactional id is not stable
> > across restarts, then zombie messages from a previous incarnation of the
> > producer may violate atomicity.  If there are 2 producer instances
> > concurrently producing data with the same transactional id, they are
> going
> > to constantly fence each other and most likely make little or no
> progress.
> >
> > The name might be a little bit confusing as it may be mistaken for a
> > transaction id / TID that uniquely identifies every transaction.  The
> name
> > and the semantics were defined in the original exactly-once-semantics
> (EoS)
> > proposal (
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > )
> > and KIP-939 just build on top of that.
> >
> > > I'm curious to understand what happens if the producer dies, and does
> not
> > come up and recover the pending transaction within the transaction
> timeout
> > interval.
> >
> > If the producer / application never comes back, the transaction will
> remain
> > in prepared (a.k.a. "in-doubt") state until an operator forcefully
> > terminates the transaction.  That's why there is a new ACL is defined in
> > this proposal -- this functionality should only provided to applications
> > that implement proper recovery logic.
> >
> > -Artem
> >
> > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish <arjun.sat...@gmail.com>
> > wrote:
> >
> > > Hello Artem,
> > >
> > > Thanks for the KIP.
> > >
> > > I have the same question as Roger on concurrent writes, and an
> additional
> > > one on consumer behavior. Typically, transactions will timeout if not
> > > committed within some time interval. With the proposed changes in this
> > KIP,
> > > consumers cannot consume past the ongoing transaction. I'm curious to
> > > understand what happens if the producer dies, and does not come up and
> > > recover the pending transaction within the transaction timeout
> interval.
> > Or
> > > are we saying that when used in this 2PC context, we should configure
> > these
> > > transaction timeouts to very large durations?
> > >
> > > Thanks in advance!
> > >
> > > Best,
> > > Arjun
> > >
> > >
> > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover <roger.hoo...@gmail.com>
> > > wrote:
> > >
> > > > Hi Artem,
> > > >
> > > > Thanks for writing this KIP.  Can you clarify the requirements a bit
> > more
> > > > for managing transaction state?  It looks like the application must
> > have
> > > > stable transactional ids over time?   What is the granularity of
> those
> > > ids
> > > > and producers?  Say the application is a multi-threaded Java web
> > server,
> > > > can/should all the concurrent threads share a transactional id and
> > > > producer?  That doesn't seem right to me unless the application is
> > using
> > > > global DB locks that serialize all requests.  Instead, if the
> > application
> > > > uses row-level DB locks, there could be multiple, concurrent,
> > independent
> > > > txns happening in the same JVM so it seems like the granularity
> > managing
> > > > transactional ids and txn state needs to line up with granularity of
> > the
> > > DB
> > > > locking.
> > > >
> > > > Does that make sense or am I misunderstanding?
> > > >
> > > > Thanks,
> > > >
> > > > Roger
> > > >
> > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > This is a discussion thread for
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > .
> > > > >
> > > > > The KIP proposes extending Kafka transaction support (that already
> > uses
> > > > 2PC
> > > > > under the hood) to enable atomicity of dual writes to Kafka and an
> > > > external
> > > > > database, and helps to fix a long standing Flink issue.
> > > > >
> > > > > An example of code that uses the dual write recipe with JDBC and
> > should
> > > > > work for most SQL databases is here
> > > > > https://github.com/apache/kafka/pull/14231.
> > > > >
> > > > > The FLIP for the sister fix in Flink is here
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > >
> > > > > -Artem
> > > > >
> > > >
> > >
> >
>

Reply via email to