Thanks for taking a look at the proposals, Gyula and Alex!

@Alexander Sorokoumov <asorokou...@confluent.io> please see my responses
below. I've tagged each bullet point to match the tag of each of your
questions.


***KIP 939 specific questions:***
(1.) Your understanding is correct. The completeTransaction introduced by
KIP-939 is basically a conditional operation. KIP-939 requires this
additional variant as a means to validate that dual writes of events to
other destinations match the transactional state in Kafka. If they match,
then Kafka can roll forward the current transaction, otherwise roll back.
Note that Flink doesn't really need this variant - see point (3) below in
the FLIP-319 specific responses section.

(1.2) Consider an atomic dual write to Kafka + some other DB. For every
write, an app would write to Kafka, prepare the txn, and then write the
obtained PreparedTxnState to the DB with a transactional write. The
PreparedTxnState would mismatch across what is committed in the DB vs Kafka
if that write to the DB failed / app failed before the DB write completed.

(5.) transaction.two.phase.commit.enable is a client-side only property,
not on brokers. When set on the client, the RPC message InitProducerId
request would have a few fields set to indicate that the initiating client
instance intends this to be a 2pc transaction. These fields will only be
recognized by brokers running with versions that support KIP-939.


***FLIP 319 questions:***
(2.) That is correct. All previous txns would have been either committed or
aborted before the Flink job starts processing data

(3.) Flink doesn’t actually need to use completeTransaction at all - I
probably should touch on this in the FLIP to clarify. It **could** use it
for an extra layer of validation, but it should never fail, otherwise
there’s a fundamental bug with Flink’s checkpointing / KafkaSink is not
integrated with the checkpointing lifecycle properly. Furthermore, if we
want to use completeTransaction, we would need to store PreparedTxnState in
the checkpoints, which is a tad bit more overhead (though quite trivial).

(4.) That is not required. The new KafkaSink can completely remove the
FlinkKafkaInternalProducer and get rid of Java reflections for good. Users
upgrading to the new KafkaSink are expected to do the following:

a) first, complete an upgrade of their Kafka clusters to a minimal version
that supports KIP 939.
b) Then, take a savepoint of their existing job (using
stop-with-savepoint), that still uses an old KafkaSink version that uses
KafkaInternalProducer.
c) Restore from that savepoint with an upgraded version of the job that
uses the new version of KafkaSink, that does not use KafkaInternalProducer.

The act of taking a savepoint ---> restoring is essentially a state
migration, from the previous Committable schema of (tid, pid, epoch) to the
new schema, which is just (tid) . The “not so nice bit” is that you can
only upgrade the Flink job only when the Kafka cluster is fully migrated.
If you upgrade the Flink job early, and the new KafkaSink’s Kafka clients
hit an old version broker, the job would fail.

(6.) transaction.two.phase.commit.enable should NOT be user configurable
when a user uses the new KafkaSink - the sink always overwrites that
property before instantiating the producer Kafka client. The value of that
would depend on if EOS is desired or not - if yes, then it’s always true,
otherwise it’ll be false. So far, we've treated overwriting Kafka client
properties passed in by the user by simply logging a warning - do you think
that is sufficient or this specific case justifies throwing an exception?


Hope this helps clarifies things!

Best regards,
Gordon

On Thu, Aug 31, 2023 at 3:30 PM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

> Hi Gordon!
>
> Thank you for publishing this FLIP! I would like to ask several questions
> and confirm my understanding of several aspects of this proposal. Even
> though this discussion is focused on FLIP-319, as it is based on top of
> KIP-939, my questions will also cover the KIP.
>
>
>
>    1. Why does KafkaProducer have both commitTransaction and
>    completeTransaction methods? Why can't we fuse them?
>       1. Is it correct that commitTransaction and abortTransaction bring
>       the transaction to its final state (committed/aborted) while
>       completeTransaction is a recovery method that rolls a txn
> back/forward,
>       depending on the prepared state?
>       2. In what situations is PreparedTransactionState in the state store
>       different from PreparedTransactionState in Kafka?
>    2. Why does the pre-commit phase initialize the producer with
>    currentProducer.initTransaction(false)?
>       1. Is it because "false" means that we do not expect any prepared txn
>       to be there, and if there is one, we should either roll it
> forward or abort
>       it? In the pre-commit phase with a new producer, there shouldn't be
> any
>       dangling txns.
>    3. Shouldn't we call completeTransaction on restore instead of
>    commitTransaction? In what situations would the flink Kafka connector
> abort
>    the transaction?
>    4. Do we need to keep the current KafkaInternalProducer for a while to
>    remain compatible with older Kafka versions that do not support KIP-939?
>    5. How will the connector handle
>    transaction.two.phase.commit.enable=false on the broker (not client)
> level?
>    6. Does it make sense for the connector users to override
>    transaction.two.phase.commit.enable? If it does not make sense, would
> the
>    connector ignore the config or throw an exception when it is passed?
>
>
> Best regards,
> Alex
>
> On Wed, Aug 23, 2023 at 6:09 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > Hi Gordon!
> >
> > Thank you for preparing the detailed FLIP, I think this is a huge
> > improvement that enables the exactly-once Kafka sink in many
> environments /
> > use-cases where this was previously unfeasible due to the limitations
> > highlighted in the FLIP.
> >
> > Big +1
> >
> > Cheers,
> > Gyula
> >
> > On Fri, Aug 18, 2023 at 7:54 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org
> >
> > wrote:
> >
> > > Hi Flink devs,
> > >
> > > I’d like to officially start a discussion for FLIP-319: Integrating
> with
> > > Kafka’s proper support for 2PC participation (KIP-939) [1].
> > >
> > > This is the “sister” joint FLIP for KIP-939 [2] [3]. It has been a
> > > long-standing issue that Flink’s Kafka connector doesn’t work fully
> > > correctly under exactly-once mode due to lack of distributed
> transaction
> > > support in the Kafka transaction protocol. This has led to subpar hacks
> > in
> > > the connector such as Java reflections to workaround the protocol's
> > > limitations (which causes a bunch of problems on its own, e.g. long
> > > recovery times for the connector), while still having corner case
> > scenarios
> > > that can lead to data loss.
> > >
> > > This joint effort with the Kafka community attempts to address this so
> > that
> > > the Flink Kafka connector can finally work against public Kafka APIs,
> > which
> > > should result in a much more robust integration between the two
> systems,
> > > and for Flink developers, easier maintainability of the code.
> > >
> > > Obviously, actually implementing this FLIP relies on the joint KIP
> being
> > > implemented and released first. Nevertheless, I'd like to start the
> > > discussion for the design as early as possible so we can benefit from
> the
> > > new Kafka changes as soon as it is available.
> > >
> > > Looking forward to feedback and comments on the proposal!
> > >
> > > Thanks,
> > > Gordon
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > [3] https://lists.apache.org/thread/wbs9sqs3z1tdm7ptw5j4o9osmx9s41nf
> > >
> >
>

Reply via email to