Hey Guozhang, Bruno,

Thank you for your feedback. I am going to respond to both of you in a
single email. I hope it is okay.

@Guozhang,

We could, instead, have a global
> config to specify if the built-in stores should be transactional or not.


This was the original approach I took in this proposal. Earlier in this
thread John, Sagar, and Bruno listed a number of issues with it. I tend to
agree with them that it is probably better user experience to control
transactionality via Materialized objects.

We could simplify our implementation for `commit`

Agreed! I updated the prototype and removed references to the commit marker
and rolling forward from the proposal.


@Bruno,

So, I would remove the details about the 2-state-store implementation
> from the KIP or provide it as an example of a possible implementation at
> the end of the KIP.
>
I moved the section about the 2-state-store implementation to the bottom of
the proposal and always mention it as a reference implementation. Please
let me know if this is okay.

Could you please describe the usage of commit() and recover() in the
> commit workflow in the KIP as we did in this thread but independently
> from the state store implementation?

I described how commit/recover change the workflow in the Overview section.

Best,
Alex

On Wed, Aug 10, 2022 at 10:07 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Alex,
>
> Thank a lot for explaining!
>
> Now some aspects are clearer to me.
>
> While I understand now, how the state store can roll forward, I have the
> feeling that rolling forward is specific to the 2-state-store
> implementation with RocksDB of your PoC. Other state store
> implementations might use a different strategy to react to crashes. For
> example, they might apply an atomic write and effectively rollback if
> they crash before committing the state store transaction. I think the
> KIP should not contain such implementation details but provide an
> interface to accommodate rolling forward and rolling backward.
>
> So, I would remove the details about the 2-state-store implementation
> from the KIP or provide it as an example of a possible implementation at
> the end of the KIP.
>
> Since a state store implementation can roll forward or roll back, I
> think it is fine to return the changelog offset from recover(). With the
> returned changelog offset, Streams knows from where to start state store
> restoration.
>
> Could you please describe the usage of commit() and recover() in the
> commit workflow in the KIP as we did in this thread but independently
> from the state store implementation? That would make things clearer.
> Additionally, descriptions of failure scenarios would also be helpful.
>
> Best,
> Bruno
>
>
> On 04.08.22 16:39, Alexander Sorokoumov wrote:
> > Hey Bruno,
> >
> > Thank you for the suggestions and the clarifying questions. I believe
> that
> > they cover the core of this proposal, so it is crucial for us to be on
> the
> > same page.
> >
> > 1. Don't you want to deprecate StateStore#flush().
> >
> >
> > Good call! I updated both the proposal and the prototype.
> >
> >   2. I would shorten Materialized#withTransactionalityEnabled() to
> >> Materialized#withTransactionsEnabled().
> >
> >
> > Turns out, these methods are no longer necessary. I removed them from the
> > proposal and the prototype.
> >
> >
> >> 3. Could you also describe a bit more in detail where the offsets passed
> >> into commit() and recover() come from?
> >
> >
> > The offset passed into StateStore#commit is the last offset committed to
> > the changelog topic. The offset passed into StateStore#recover is the
> last
> > checkpointed offset for the given StateStore. Let's look at steps 3 and 4
> > in the commit workflow. After the TaskExecutor/TaskManager commits, it
> calls
> > StreamTask#postCommit[1] that in turn:
> > a. updates the changelog offsets via
> > ProcessorStateManager#updateChangelogOffsets[2]. The offsets here come
> from
> > the RecordCollector[3], which tracks the latest offsets the producer sent
> > without exception[4, 5].
> > b. flushes/commits the state store in AbstractTask#maybeCheckpoint[6].
> This
> > method essentially calls ProcessorStateManager methods - flush/commit[7]
> > and checkpoint[8]. ProcessorStateManager#commit goes over all state
> stores
> > that belong to that task and commits them with the offset obtained in
> step
> > `a`. ProcessorStateManager#checkpoint writes down those offsets for all
> > state stores, except for non-transactional ones in the case of EOS.
> >
> > During initialization, StreamTask calls
> > StateManagerUtil#registerStateStores[8] that in turn calls
> > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9]. At the
> > moment, this method assigns checkpointed offsets to the corresponding
> state
> > stores[10]. The prototype also calls StateStore#recover with the
> > checkpointed offset and assigns the offset returned by recover()[11].
> >
> > 4. I do not quite understand how a state store can roll forward. You
> >> mention in the thread the following:
> >
> >
> > The 2-state-stores commit looks like this [12]:
> >
> >     1. Flush the temporary state store.
> >     2. Create a commit marker with a changelog offset corresponding to
> the
> >     state we are committing.
> >     3. Go over all keys in the temporary store and write them down to the
> >     main one.
> >     4. Wipe the temporary store.
> >     5. Delete the commit marker.
> >
> >
> > Let's consider crash failure scenarios:
> >
> >     - Crash failure happens between steps 1 and 2. The main state store
> is
> >     in a consistent state that corresponds to the previously checkpointed
> >     offset. StateStore#recover throws away the temporary store and
> proceeds
> >     from the last checkpointed offset.
> >     - Crash failure happens between steps 2 and 3. We do not know what
> keys
> >     from the temporary store were already written to the main store, so
> we
> >     can't roll back. There are two options - either wipe the main store
> or roll
> >     forward. Since the point of this proposal is to avoid situations
> where we
> >     throw away the state and we do not care to what consistent state the
> store
> >     rolls to, we roll forward by continuing from step 3.
> >     - Crash failure happens between steps 3 and 4. We can't distinguish
> >     between this and the previous scenario, so we write all the keys
> from the
> >     temporary store. This is okay because the operation is idempotent.
> >     - Crash failure happens between steps 4 and 5. Again, we can't
> >     distinguish between this and previous scenarios, but the temporary
> store is
> >     already empty. Even though we write all keys from the temporary
> store, this
> >     operation is, in fact, no-op.
> >     - Crash failure happens between step 5 and checkpoint. This is the
> case
> >     you referred to in question 5. The commit is finished, but it is not
> >     reflected at the checkpoint. recover() returns the offset of the
> previous
> >     commit here, which is incorrect, but it is okay because we will
> replay the
> >     changelog from the previously committed offset. As changelog replay
> is
> >     idempotent, the state store recovers into a consistent state.
> >
> > The last crash failure scenario is a natural transition to
> >
> > how should Streams know what to write into the checkpoint file
> >> after the crash?
> >>
> >
> > As mentioned above, the Streams app writes the checkpoint file after the
> > Kafka transaction and then the StateStore commit. Same as without the
> > proposal, it should write the committed offset, as it is the same for
> both
> > the Kafka changelog and the state store.
> >
> >
> >> This issue arises because we store the offset outside of the state
> >> store. Maybe we need an additional method on the state store interface
> >> that returns the offset at which the state store is.
> >
> >
> > In my opinion, we should include in the interface only the guarantees
> that
> > are necessary to preserve EOS without wiping the local state. This way,
> we
> > allow more room for possible implementations. Thanks to the idempotency
> of
> > the changelog replay, it is "good enough" if StateStore#recover returns
> the
> > offset that is less than what it actually is. The only limitation here is
> > that the state store should never commit writes that are not yet
> committed
> > in Kafka changelog.
> >
> > Please let me know what you think about this. First of all, I am
> relatively
> > new to the codebase, so I might be wrong in my understanding of
> > how it works. Second, while writing this, it occured to me that the
> > StateStore#recover interface method is not straightforward as it can be.
> > Maybe we can change it like that:
> >
> > /**
> >      * Recover a transactional state store
> >      * <p>
> >      * If a transactional state store shut down with a crash failure,
> this
> > method ensures that the
> >      * state store is in a consistent state that corresponds to {@code
> > changelofOffset} or later.
> >      *
> >      * @param changelogOffset the checkpointed changelog offset.
> >      * @return {@code true} if recovery succeeded, {@code false}
> otherwise.
> >      */
> > boolean recover(final Long changelogOffset) {
> >
> > Note: all links below except for [10] lead to the prototype's code.
> > 1.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L468
> > 2.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L580
> > 3.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L868
> > 4.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L94-L96
> > 5.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L213-L216
> > 6.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L94-L97
> > 7.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L469
> > 8.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L226
> > 9.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L103
> > 10.
> >
> https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L251-L252
> > 11.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L250-L265
> > 12.
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java#L84-L88
> >
> > Best,
> > Alex
> >
> > On Fri, Jul 29, 2022 at 3:42 PM Bruno Cadonna <cado...@apache.org>
> wrote:
> >
> >> Hi Alex,
> >>
> >> Thanks for the updates!
> >>
> >> 1. Don't you want to deprecate StateStore#flush(). As far as I
> >> understand, commit() is the new flush(), right? If you do not deprecate
> >> it, you don't get rid of the error room you describe in your KIP by
> >> having a flush() and a commit().
> >>
> >>
> >> 2. I would shorten Materialized#withTransactionalityEnabled() to
> >> Materialized#withTransactionsEnabled().
> >>
> >>
> >> 3. Could you also describe a bit more in detail where the offsets passed
> >> into commit() and recover() come from?
> >>
> >>
> >> For my next two points, I need the commit workflow that you were so kind
> >> to post into this thread:
> >>
> >> 1. write stuff to the state store
> >> 2. producer.sendOffsetsToTransaction(token);
> producer.commitTransaction();
> >> 3. flush (<- that would be call to commit(), right?)
> >> 4. checkpoint
> >>
> >>
> >> 4. I do not quite understand how a state store can roll forward. You
> >> mention in the thread the following:
> >>
> >> "If the crash failure happens during #3, the state store can roll
> >> forward and finish the flush/commit."
> >>
> >> How does the state store know where it stopped the flushing when it
> >> crashed?
> >>
> >> This seems an optimization to me. I think in general the state store
> >> should rollback to the last successfully committed state and restore
> >> from there until the end of the changelog topic partition. The last
> >> committed state is the offsets in the checkpoint file.
> >>
> >>
> >> 5. In the same e-mail from point 4, you also state:
> >>
> >> "If the crash failure happens between #3 and #4, the state store should
> >> do nothing during recovery and just proceed with the checkpoint."
> >>
> >> How should Streams know that the failure was between #3 and #4 during
> >> recovery? It just sees a valid state store and a valid checkpoint file.
> >> Streams does not know that the state of the checkpoint file does not
> >> match with the committed state of the state store.
> >> Also, how should Streams know what to write into the checkpoint file
> >> after the crash?
> >> This issue arises because we store the offset outside of the state
> >> store. Maybe we need an additional method on the state store interface
> >> that returns the offset at which the state store is.
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >>
> >> On 27.07.22 11:51, Alexander Sorokoumov wrote:
> >>> Hey Nick,
> >>>
> >>> Thank you for the kind words and the feedback! I'll definitely add an
> >>> option to configure the transactional mechanism in Stores factory
> method
> >>> via an argument as John previously suggested and might add the
> in-memory
> >>> option via RocksDB Indexed Batches if I figure why their creation via
> >>> rocksdb jni fails with `UnsatisfiedLinkException`.
> >>>
> >>> Best,
> >>> Alex
> >>>
> >>> On Wed, Jul 27, 2022 at 11:46 AM Alexander Sorokoumov <
> >>> asorokou...@confluent.io> wrote:
> >>>
> >>>> Hey Guozhang,
> >>>>
> >>>> 1) About the param passed into the `recover()` function: it seems to
> me
> >>>>> that the semantics of "recover(offset)" is: recover this state to a
> >>>>> transaction boundary which is at least the passed-in offset. And the
> >> only
> >>>>> possibility that the returned offset is different than the passed-in
> >>>>> offset
> >>>>> is that if the previous failure happens after we've done all the
> commit
> >>>>> procedures except writing the new checkpoint, in which case the
> >> returned
> >>>>> offset would be larger than the passed-in offset. Otherwise it should
> >>>>> always be equal to the passed-in offset, is that right?
> >>>>
> >>>>
> >>>> Right now, the only case when `recover` returns an offset different
> from
> >>>> the passed one is when the failure happens *during* commit.
> >>>>
> >>>>
> >>>> If the failure happens after commit but before the checkpoint,
> `recover`
> >>>> might return either a passed or newer committed offset, depending on
> the
> >>>> implementation. The `recover` implementation in the prototype returns
> a
> >>>> passed offset because it deletes the commit marker that holds that
> >> offset
> >>>> after the commit is done. In that case, the store will replay the last
> >>>> commit from the changelog. I think it is fine as the changelog replay
> is
> >>>> idempotent.
> >>>>
> >>>> 2) It seems the only use for the "transactional()" function is to
> >> determine
> >>>>> if we can update the checkpoint file while in EOS.
> >>>>
> >>>>
> >>>> Right now, there are 2 other uses for `transactional()`:
> >>>> 1. To determine what to do during initialization if the checkpoint is
> >> gone
> >>>> (see [1]). If the state store is transactional, we don't have to wipe
> >> the
> >>>> existing data. Thinking about it now, we do not really need this check
> >>>> whether the store is `transactional` because if it is not, we'd not
> have
> >>>> written the checkpoint in the first place. I am going to remove that
> >> check.
> >>>> 2. To determine if the persistent kv store in KStreamImplJoin should
> be
> >>>> transactional (see [2], [3]).
> >>>>
> >>>> I am not sure if we can get rid of the checks in point 2. If so, I'd
> be
> >>>> happy to encapsulate `transactional()` logic in `commit/recover`.
> >>>>
> >>>> Best,
> >>>> Alex
> >>>>
> >>>> 1.
> >>>>
> >>
> https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281
> >>>> 2.
> >>>>
> >>
> https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278
> >>>> 3.
> >>>>
> >>
> https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354
> >>>>
> >>>> On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <nick.telf...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Alex,
> >>>>>
> >>>>> Excellent proposal, I'm very keen to see this land!
> >>>>>
> >>>>> Would it be useful to permit configuring the type of store used for
> >>>>> uncommitted offsets on a store-by-store basis? This way, users could
> >>>>> choose
> >>>>> whether to use, e.g. an in-memory store or RocksDB, potentially
> >> reducing
> >>>>> the overheads associated with RocksDb for smaller stores, but without
> >> the
> >>>>> memory pressure issues?
> >>>>>
> >>>>> I suspect that in most cases, the number of uncommitted records will
> be
> >>>>> very small, because the default commit interval is 100ms.
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> Nick
> >>>>>
> >>>>> On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hello Alex,
> >>>>>>
> >>>>>> Thanks for the updated KIP, I looked over it and browsed the WIP and
> >>>>> just
> >>>>>> have a couple meta thoughts:
> >>>>>>
> >>>>>> 1) About the param passed into the `recover()` function: it seems to
> >> me
> >>>>>> that the semantics of "recover(offset)" is: recover this state to a
> >>>>>> transaction boundary which is at least the passed-in offset. And the
> >>>>> only
> >>>>>> possibility that the returned offset is different than the passed-in
> >>>>> offset
> >>>>>> is that if the previous failure happens after we've done all the
> >> commit
> >>>>>> procedures except writing the new checkpoint, in which case the
> >> returned
> >>>>>> offset would be larger than the passed-in offset. Otherwise it
> should
> >>>>>> always be equal to the passed-in offset, is that right?
> >>>>>>
> >>>>>> 2) It seems the only use for the "transactional()" function is to
> >>>>> determine
> >>>>>> if we can update the checkpoint file while in EOS. But the purpose
> of
> >>>>> the
> >>>>>> checkpoint file's offsets is just to tell "the local state's current
> >>>>>> snapshot's progress is at least the indicated offsets" anyways, and
> >> with
> >>>>>> this KIP maybe we would just do:
> >>>>>>
> >>>>>> a) when in ALOS, upon failover: we set the starting offset as
> >>>>>> checkpointed-offset, then restore() from changelog till the
> >> end-offset.
> >>>>>> This way we may restore some records twice.
> >>>>>> b) when in EOS, upon failover: we first call
> >>>>> recover(checkpointed-offset),
> >>>>>> then set the starting offset as the returned offset (which may be
> >> larger
> >>>>>> than checkpointed-offset), then restore until the end-offset.
> >>>>>>
> >>>>>> So why not also:
> >>>>>> c) we let the `commit()` function to also return an offset, which
> >>>>> indicates
> >>>>>> "checkpointable offsets".
> >>>>>> d) for existing non-transactional stores, we just have a default
> >>>>>> implementation of "commit()" which is simply a flush, and returns a
> >>>>>> sentinel value like -1. Then later if we get checkpointable offsets
> >> -1,
> >>>>> we
> >>>>>> do not write the checkpoint. Upon clean shutting down we can just
> >>>>>> checkpoint regardless of the returned value from "commit".
> >>>>>> e) for existing non-transactional stores, we just have a default
> >>>>>> implementation of "recover()" which is to wipe out the local store
> and
> >>>>>> return offset 0 if the passed in offset is -1, otherwise if not -1
> >> then
> >>>>> it
> >>>>>> indicates a clean shutdown in the last run, can this function is
> just
> >> a
> >>>>>> no-op.
> >>>>>>
> >>>>>> In that case, we would not need the "transactional()" function
> >> anymore,
> >>>>>> since for non-transactional stores their behaviors are still wrapped
> >> in
> >>>>> the
> >>>>>> `commit / recover` function pairs.
> >>>>>>
> >>>>>> I have not completed the thorough pass on your WIP PR, so maybe I
> >> could
> >>>>>> come up with some more feedback later, but just let me know if my
> >>>>>> understanding above is correct or not?
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov
> >>>>>> <asorokou...@confluent.io.invalid> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I updated the KIP with the following changes:
> >>>>>>> * Replaced in-memory batches with the secondary-store approach as
> the
> >>>>>>> default implementation to address the feedback about memory
> pressure
> >>>>> as
> >>>>>>> suggested by Sagar and Bruno.
> >>>>>>> * Introduced StateStore#commit and StateStore#recover methods as an
> >>>>>>> extension of the rollback idea. @Guozhang, please see the comment
> >>>>> below
> >>>>>> on
> >>>>>>> why I took a slightly different approach than you suggested.
> >>>>>>> * Removed mentions of changes to IQv1 and IQv2. Transactional state
> >>>>>> stores
> >>>>>>> enable reading committed in IQ, but it is really an independent
> >>>>> feature
> >>>>>>> that deserves its own KIP. Conflating them unnecessarily increases
> >> the
> >>>>>>> scope for discussion, implementation, and testing in a single unit
> of
> >>>>>> work.
> >>>>>>>
> >>>>>>> I also published a prototype -
> >>>>>> https://github.com/apache/kafka/pull/12393
> >>>>>>> that implements changes described in the proposal.
> >>>>>>>
> >>>>>>> Regarding explicit rollback, I think it is a powerful idea that
> >> allows
> >>>>>>> other StateStore implementations to take a different path to the
> >>>>>>> transactional behavior rather than keep 2 state stores. Instead of
> >>>>>>> introducing a new commit token, I suggest using a changelog offset
> >>>>> that
> >>>>>>> already 1:1 corresponds to the materialized state. This works
> nicely
> >>>>>>> because Kafka Stream first commits an AK transaction and only then
> >>>>>>> checkpoints the state store, so we can use the changelog offset to
> >>>>> commit
> >>>>>>> the state store transaction.
> >>>>>>>
> >>>>>>> I called the method StateStore#recover rather than
> >> StateStore#rollback
> >>>>>>> because a state store might either roll back or forward depending
> on
> >>>>> the
> >>>>>>> specific point of the crash failure.Consider the write algorithm in
> >>>>> Kafka
> >>>>>>> Streams is:
> >>>>>>> 1. write stuff to the state store
> >>>>>>> 2. producer.sendOffsetsToTransaction(token);
> >>>>>> producer.commitTransaction();
> >>>>>>> 3. flush
> >>>>>>> 4. checkpoint
> >>>>>>>
> >>>>>>> Let's consider 3 cases:
> >>>>>>> 1. If the crash failure happens between #2 and #3, the state store
> >>>>> rolls
> >>>>>>> back and replays the uncommitted transaction from the changelog.
> >>>>>>> 2. If the crash failure happens during #3, the state store can roll
> >>>>>> forward
> >>>>>>> and finish the flush/commit.
> >>>>>>> 3. If the crash failure happens between #3 and #4, the state store
> >>>>> should
> >>>>>>> do nothing during recovery and just proceed with the checkpoint.
> >>>>>>>
> >>>>>>> Looking forward to your feedback,
> >>>>>>> Alexander
> >>>>>>>
> >>>>>>> On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov <
> >>>>>>> asorokou...@confluent.io> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> As a status update, I did the following changes to the KIP:
> >>>>>>>> * replaced configuration via the top-level config with
> configuration
> >>>>>> via
> >>>>>>>> Stores factory and StoreSuppliers,
> >>>>>>>> * added IQv2 and elaborated how readCommitted will work when the
> >>>>> store
> >>>>>> is
> >>>>>>>> not transactional,
> >>>>>>>> * removed claims about ALOS.
> >>>>>>>>
> >>>>>>>> I am going to be OOO in the next couple of weeks and will resume
> >>>>>> working
> >>>>>>>> on the proposal and responding to the discussion in this thread
> >>>>>> starting
> >>>>>>>> June 27. My next top priorities are:
> >>>>>>>> 1. Prototype the rollback approach as suggested by Guozhang.
> >>>>>>>> 2. Replace in-memory batches with the secondary-store approach as
> >>>>> the
> >>>>>>>> default implementation to address the feedback about memory
> >>>>> pressure as
> >>>>>>>> suggested by Sagar and Bruno.
> >>>>>>>> 3. Adjust Stores methods to make transactional implementations
> >>>>>> pluggable.
> >>>>>>>> 4. Publish the POC for the first review.
> >>>>>>>>
> >>>>>>>> Best regards,
> >>>>>>>> Alex
> >>>>>>>>
> >>>>>>>> On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Alex,
> >>>>>>>>>
> >>>>>>>>> Thanks for your replies! That is very helpful.
> >>>>>>>>>
> >>>>>>>>> Just to broaden our discussions a bit here, I think there are
> some
> >>>>>> other
> >>>>>>>>> approaches in parallel to the idea of "enforce to only persist
> upon
> >>>>>>>>> explicit flush" and I'd like to throw one here -- not really
> >>>>>> advocating
> >>>>>>>>> it,
> >>>>>>>>> but just for us to compare the pros and cons:
> >>>>>>>>>
> >>>>>>>>> 1) We let the StateStore's `flush` function to return a token
> >>>>> instead
> >>>>>> of
> >>>>>>>>> returning `void`.
> >>>>>>>>> 2) We add another `rollback(token)` interface of StateStore which
> >>>>>> would
> >>>>>>>>> effectively rollback the state as indicated by the token to the
> >>>>>> snapshot
> >>>>>>>>> when the corresponding `flush` is called.
> >>>>>>>>> 3) We encode the token and commit as part of
> >>>>>>>>> `producer#sendOffsetsToTransaction`.
> >>>>>>>>>
> >>>>>>>>> Users could optionally implement the new functions, or they can
> >>>>> just
> >>>>>> not
> >>>>>>>>> return the token at all and not implement the second function.
> >>>>> Again,
> >>>>>>> the
> >>>>>>>>> APIs are just for the sake of illustration, not feeling they are
> >>>>> the
> >>>>>>> most
> >>>>>>>>> natural :)
> >>>>>>>>>
> >>>>>>>>> Then the procedure would be:
> >>>>>>>>>
> >>>>>>>>> 1. the previous checkpointed offset is 100
> >>>>>>>>> ...
> >>>>>>>>> 3. flush store, make sure all writes are persisted; get the
> >>>>> returned
> >>>>>>> token
> >>>>>>>>> that indicates the snapshot of 200.
> >>>>>>>>> 4. producer.sendOffsetsToTransaction(token);
> >>>>>>> producer.commitTransaction();
> >>>>>>>>> 5. Update the checkpoint file (say, the new value is 200).
> >>>>>>>>>
> >>>>>>>>> Then if there's a failure, say between 3/4, we would get the
> token
> >>>>>> from
> >>>>>>>>> the
> >>>>>>>>> last committed txn, and first we would do the restoration (which
> >>>>> may
> >>>>>> get
> >>>>>>>>> the state to somewhere between 100 and 200), then call
> >>>>>>>>> `store.rollback(token)` to rollback to the snapshot of offset
> 100.
> >>>>>>>>>
> >>>>>>>>> The pros is that we would then not need to enforce the state
> >>>>> stores to
> >>>>>>> not
> >>>>>>>>> persist any data during the txn: for stores that may not be able
> to
> >>>>>>>>> implement the `rollback` function, they can still reduce its impl
> >>>>> to
> >>>>>>> "not
> >>>>>>>>> persisting any data" via this API, but for stores that can indeed
> >>>>>>> support
> >>>>>>>>> the rollback, their implementation may be more efficient. The
> cons
> >>>>>>> though,
> >>>>>>>>> on top of my head are 1) more complicated logic differentiating
> >>>>>> between
> >>>>>>>>> EOS
> >>>>>>>>> with and without store rollback support, and ALOS, 2) encoding
> the
> >>>>>> token
> >>>>>>>>> as
> >>>>>>>>> part of the commit offset is not ideal if it is big, 3) the
> >>>>> recovery
> >>>>>>> logic
> >>>>>>>>> including the state store is also a bit more complicated.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov
> >>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>
> >>>>>>>>>> But I'm still trying to clarify how it guarantees EOS, and it
> >>>>> seems
> >>>>>>>>> that we
> >>>>>>>>>>> would achieve it by enforcing to not persist any data written
> >>>>>> within
> >>>>>>>>> this
> >>>>>>>>>>> transaction until step 4. Is that correct?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> This is correct. Both alternatives - in-memory
> >>>>> WriteBatchWithIndex
> >>>>>> and
> >>>>>>>>>> transactionality via the secondary store guarantee EOS by not
> >>>>>>> persisting
> >>>>>>>>>> data in the "main" state store until it is committed in the
> >>>>>> changelog
> >>>>>>>>>> topic.
> >>>>>>>>>>
> >>>>>>>>>> Oh what I meant is not what KStream code does, but that
> >>>>> StateStore
> >>>>>>> impl
> >>>>>>>>>>> classes themselves could potentially flush data to become
> >>>>>> persisted
> >>>>>>>>>>> asynchronously
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thank you for elaborating! You are correct, the underlying state
> >>>>>> store
> >>>>>>>>>> should not persist data until the streams app calls
> >>>>>> StateStore#flush.
> >>>>>>>>> There
> >>>>>>>>>> are 2 options how a State Store implementation can guarantee
> >>>>> that -
> >>>>>>>>> either
> >>>>>>>>>> keep uncommitted writes in memory or be able to roll back the
> >>>>>> changes
> >>>>>>>>> that
> >>>>>>>>>> were not committed during recovery. RocksDB's
> >>>>> WriteBatchWithIndex is
> >>>>>>> an
> >>>>>>>>>> implementation of the first option. A considered alternative,
> >>>>>>>>> Transactions
> >>>>>>>>>> via Secondary State Store for Uncommitted Changes, is the way to
> >>>>>>>>> implement
> >>>>>>>>>> the second option.
> >>>>>>>>>>
> >>>>>>>>>> As everyone correctly pointed out, keeping uncommitted data in
> >>>>>> memory
> >>>>>>>>>> introduces a very real risk of OOM that we will need to handle.
> >>>>> The
> >>>>>>>>> more I
> >>>>>>>>>> think about it, the more I lean towards going with the
> >>>>> Transactions
> >>>>>>> via
> >>>>>>>>>> Secondary Store as the way to implement transactionality as it
> >>>>> does
> >>>>>>> not
> >>>>>>>>>> have that issue.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Alex
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <
> >>>>> wangg...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello Alex,
> >>>>>>>>>>>
> >>>>>>>>>>>> we flush the cache, but not the underlying state store.
> >>>>>>>>>>>
> >>>>>>>>>>> You're right. The ordering I mentioned above is actually:
> >>>>>>>>>>>
> >>>>>>>>>>> ...
> >>>>>>>>>>> 3. producer.sendOffsetsToTransaction();
> >>>>>>> producer.commitTransaction();
> >>>>>>>>>>> 4. flush store, make sure all writes are persisted.
> >>>>>>>>>>> 5. Update the checkpoint file to 200.
> >>>>>>>>>>>
> >>>>>>>>>>> But I'm still trying to clarify how it guarantees EOS, and it
> >>>>>> seems
> >>>>>>>>> that
> >>>>>>>>>> we
> >>>>>>>>>>> would achieve it by enforcing to not persist any data written
> >>>>>> within
> >>>>>>>>> this
> >>>>>>>>>>> transaction until step 4. Is that correct?
> >>>>>>>>>>>
> >>>>>>>>>>>> Can you please point me to the place in the codebase where we
> >>>>>>>>> trigger
> >>>>>>>>>>> async flush before the commit?
> >>>>>>>>>>>
> >>>>>>>>>>> Oh what I meant is not what KStream code does, but that
> >>>>> StateStore
> >>>>>>>>> impl
> >>>>>>>>>>> classes themselves could potentially flush data to become
> >>>>>> persisted
> >>>>>>>>>>> asynchronously, e.g. RocksDB does that naturally out of the
> >>>>>> control
> >>>>>>> of
> >>>>>>>>>>> KStream code. I think it is related to my previous question:
> >>>>> if we
> >>>>>>>>> think
> >>>>>>>>>> by
> >>>>>>>>>>> guaranteeing EOS at the state store level, we would effectively
> >>>>>> ask
> >>>>>>>>> the
> >>>>>>>>>>> impl classes that "you should not persist any data until
> >>>>> `flush`
> >>>>>> is
> >>>>>>>>>> called
> >>>>>>>>>>> explicitly", is the StateStore interface the right level to
> >>>>>> enforce
> >>>>>>>>> such
> >>>>>>>>>>> mechanisms, or should we just do that on top of the
> >>>>> StateStores,
> >>>>>>> e.g.
> >>>>>>>>>>> during the transaction we just keep all the writes in the cache
> >>>>>> (of
> >>>>>>>>>> course
> >>>>>>>>>>> we need to consider how to work around memory pressure as
> >>>>>> previously
> >>>>>>>>>>> mentioned), and then upon committing, we just write the cached
> >>>>>>> records
> >>>>>>>>>> as a
> >>>>>>>>>>> whole into the store and then call flush.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov
> >>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hey,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for the wealth of great suggestions and questions!
> >>>>> I
> >>>>>> am
> >>>>>>>>> going
> >>>>>>>>>>> to
> >>>>>>>>>>>> address the feedback in batches and update the proposal
> >>>>> async,
> >>>>>> as
> >>>>>>>>> it is
> >>>>>>>>>>>> probably going to be easier for everyone. I will also write a
> >>>>>>>>> separate
> >>>>>>>>>>>> message after making updates to the KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> @John,
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Did you consider instead just adding the option to the
> >>>>>>>>>>>>> RocksDB*StoreSupplier classes and the factories in Stores ?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for suggesting that. I think that this idea is
> >>>>> better
> >>>>>>> than
> >>>>>>>>>>> what I
> >>>>>>>>>>>> came up with and will update the KIP with configuring
> >>>>>>>>> transactionality
> >>>>>>>>>>> via
> >>>>>>>>>>>> the suppliers and Stores.
> >>>>>>>>>>>>
> >>>>>>>>>>>> what is the advantage over just doing the same thing with the
> >>>>>>>>>> RecordCache
> >>>>>>>>>>>>> and not introducing the WriteBatch at all?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Can you point me to RecordCache? I can't find it in the
> >>>>> project.
> >>>>>>> The
> >>>>>>>>>>>> advantage would be that WriteBatch guarantees write
> >>>>> atomicity.
> >>>>>> As
> >>>>>>>>> far
> >>>>>>>>>> as
> >>>>>>>>>>> I
> >>>>>>>>>>>> understood the way RecordCache works, it might leave the
> >>>>> system
> >>>>>> in
> >>>>>>>>> an
> >>>>>>>>>>>> inconsistent state during crash failure on write.
> >>>>>>>>>>>>
> >>>>>>>>>>>> You mentioned that a transactional store can help reduce
> >>>>>>>>> duplication in
> >>>>>>>>>>> the
> >>>>>>>>>>>>> case of ALOS
> >>>>>>>>>>>>
> >>>>>>>>>>>> I will remove claims about ALOS from the proposal. Thank you
> >>>>> for
> >>>>>>>>>>>> elaborating!
> >>>>>>>>>>>>
> >>>>>>>>>>>> As a reminder, we have a new IQv2 mechanism now. Should we
> >>>>>> propose
> >>>>>>>>> any
> >>>>>>>>>>>>> changes to IQv1 to support this transactional mechanism,
> >>>>>> versus
> >>>>>>>>> just
> >>>>>>>>>>>>> proposing it for IQv2? Certainly, it seems strange only to
> >>>>>>>>> propose a
> >>>>>>>>>>>> change
> >>>>>>>>>>>>> for IQv1 and not v2.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>    I will update the proposal with complementary API changes
> >>>>> for
> >>>>>>> IQv2
> >>>>>>>>>>>>
> >>>>>>>>>>>> What should IQ do if I request to readCommitted on a
> >>>>>>>>> non-transactional
> >>>>>>>>>>>>> store?
> >>>>>>>>>>>>
> >>>>>>>>>>>> We can assume that non-transactional stores commit on write,
> >>>>> so
> >>>>>> IQ
> >>>>>>>>>> works
> >>>>>>>>>>> in
> >>>>>>>>>>>> the same way with non-transactional stores regardless of the
> >>>>>> value
> >>>>>>>>> of
> >>>>>>>>>>>> readCommitted.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>    @Guozhang,
> >>>>>>>>>>>>
> >>>>>>>>>>>> * If we crash between line 3 and 4, then at that time the
> >>>>> local
> >>>>>>>>>>> persistent
> >>>>>>>>>>>>> store image is representing as of offset 200, but upon
> >>>>>> recovery
> >>>>>>>>> all
> >>>>>>>>>>>>> changelog records from 100 to log-end-offset would be
> >>>>>> considered
> >>>>>>>>> as
> >>>>>>>>>>>> aborted
> >>>>>>>>>>>>> and not be replayed and we would restart processing from
> >>>>>>> position
> >>>>>>>>>> 100.
> >>>>>>>>>>>>> Restart processing will violate EOS.I'm not sure how e.g.
> >>>>>>>>> RocksDB's
> >>>>>>>>>>>>> WriteBatchWithIndex would make sure that the step 4 and
> >>>>> step 5
> >>>>>>>>> could
> >>>>>>>>>> be
> >>>>>>>>>>>>> done atomically here.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Could you please point me to the place in the codebase where
> >>>>> a
> >>>>>>> task
> >>>>>>>>>>> flushes
> >>>>>>>>>>>> the store before committing the transaction?
> >>>>>>>>>>>> Looking at TaskExecutor (
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
> >>>>>>>>>>>> ),
> >>>>>>>>>>>> StreamTask#prepareCommit (
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
> >>>>>>>>>>>> ),
> >>>>>>>>>>>> and CachedStateStore (
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
> >>>>>>>>>>>> )
> >>>>>>>>>>>> we flush the cache, but not the underlying state store.
> >>>>> Explicit
> >>>>>>>>>>>> StateStore#flush happens in
> >>>>> AbstractTask#maybeWriteCheckpoint (
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
> >>>>>>>>>>>> ).
> >>>>>>>>>>>> Is there something I am missing here?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Today all cached data that have not been flushed are not
> >>>>>> committed
> >>>>>>>>> for
> >>>>>>>>>>>>> sure, but even flushed data to the persistent underlying
> >>>>> store
> >>>>>>> may
> >>>>>>>>>> also
> >>>>>>>>>>>> be
> >>>>>>>>>>>>> uncommitted since flushing can be triggered asynchronously
> >>>>>>> before
> >>>>>>>>> the
> >>>>>>>>>>>>> commit.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Can you please point me to the place in the codebase where we
> >>>>>>>>> trigger
> >>>>>>>>>>> async
> >>>>>>>>>>>> flush before the commit? This would certainly be a reason to
> >>>>>>>>> introduce
> >>>>>>>>>> a
> >>>>>>>>>>>> dedicated StateStore#commit method.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks again for the feedback. I am going to update the KIP
> >>>>> and
> >>>>>>> then
> >>>>>>>>>>>> respond to the next batch of questions and suggestions.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Alex
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, May 30, 2022 at 5:13 PM Suhas Satish
> >>>>>>>>>>> <ssat...@confluent.io.invalid
> >>>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP proposal Alex.
> >>>>>>>>>>>>> 1. Configuration default
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> You mention applications using streams DSL with built-in
> >>>>>> rocksDB
> >>>>>>>>>> state
> >>>>>>>>>>>>> store will get transactional state stores by default when
> >>>>> EOS
> >>>>>> is
> >>>>>>>>>>> enabled,
> >>>>>>>>>>>>> but the default implementation for apps using PAPI will
> >>>>>> fallback
> >>>>>>>>> to
> >>>>>>>>>>>>> non-transactional behavior.
> >>>>>>>>>>>>> Shouldn't we have the same default behavior for both types
> >>>>> of
> >>>>>>>>> apps -
> >>>>>>>>>>> DSL
> >>>>>>>>>>>>> and PAPI?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <
> >>>>>>> cado...@apache.org
> >>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the PR, Alex!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I am also glad to see this coming.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Configuration
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I would also prefer to restrict the configuration of
> >>>>>>>>> transactional
> >>>>>>>>>> on
> >>>>>>>>>>>>>> the state sore. Ideally, calling method transactional()
> >>>>> on
> >>>>>> the
> >>>>>>>>>> state
> >>>>>>>>>>>>>> store would be enough. An option on the store builder
> >>>>> would
> >>>>>>>>> make it
> >>>>>>>>>>>>>> possible to turn transactionality on and off (as John
> >>>>>>> proposed).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. Memory usage in RocksDB
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This seems to be a major issue. We do not have any
> >>>>> guarantee
> >>>>>>>>> that
> >>>>>>>>>>>>>> uncommitted writes fit into memory and I guess we will
> >>>>> never
> >>>>>>>>> have.
> >>>>>>>>>>> What
> >>>>>>>>>>>>>> happens when the uncommitted writes do not fit into
> >>>>> memory?
> >>>>>>> Does
> >>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>> throw an exception? Can we handle such an exception
> >>>>> without
> >>>>>>>>>> crashing?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Does the RocksDB behavior even need to be included in
> >>>>> this
> >>>>>>> KIP?
> >>>>>>>>> In
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> end it is an implementation detail.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What we should consider - though - is a memory limit in
> >>>>> some
> >>>>>>>>> form.
> >>>>>>>>>>> And
> >>>>>>>>>>>>>> what we do when the memory limit is exceeded.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3. PoC
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I agree with Guozhang that a PoC is a good idea to better
> >>>>>>>>>> understand
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> devils in the details.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 25.05.22 01:52, Guozhang Wang wrote:
> >>>>>>>>>>>>>>> Hello Alex,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for writing the proposal! Glad to see it
> >>>>> coming. I
> >>>>>>>>> think
> >>>>>>>>>>> this
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> kind of a KIP that since too many devils would be
> >>>>> buried
> >>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>>> details
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> it's better to start working on a POC, either in
> >>>>> parallel,
> >>>>>>> or
> >>>>>>>>>>> before
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>> resume our discussion, rather than blocking any
> >>>>>>> implementation
> >>>>>>>>>>> until
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>> satisfied with the proposal.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Just as a concrete example, I personally am still not
> >>>>> 100%
> >>>>>>>>> clear
> >>>>>>>>>>> how
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> proposal would work to achieve EOS with the state
> >>>>> stores.
> >>>>>>> For
> >>>>>>>>>>>> example,
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> commit procedure today looks like this:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 0: there's an existing checkpoint file indicating the
> >>>>>>>>> changelog
> >>>>>>>>>>>> offset
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> the local state store image is 100. Now a commit is
> >>>>>>> triggered:
> >>>>>>>>>>>>>>> 1. flush cache (since it contains partially processed
> >>>>>>>>> records),
> >>>>>>>>>>> make
> >>>>>>>>>>>>> sure
> >>>>>>>>>>>>>>> all records are written to the producer.
> >>>>>>>>>>>>>>> 2. flush producer, making sure all changelog records
> >>>>> have
> >>>>>>> now
> >>>>>>>>>>> acked.
> >>>>>>>>>>>> //
> >>>>>>>>>>>>>>> here we would get the new changelog position, say 200
> >>>>>>>>>>>>>>> 3. flush store, make sure all writes are persisted.
> >>>>>>>>>>>>>>> 4. producer.sendOffsetsToTransaction();
> >>>>>>>>>>> producer.commitTransaction();
> >>>>>>>>>>>>> //
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> would make the writes in changelog up to offset 200
> >>>>>>> committed
> >>>>>>>>>>>>>>> 5. Update the checkpoint file to 200.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The question about atomicity between those lines, for
> >>>>>>> example:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> * If we crash between line 4 and line 5, the local
> >>>>>>> checkpoint
> >>>>>>>>>> file
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>> stay as 100, and upon recovery we would replay the
> >>>>>> changelog
> >>>>>>>>> from
> >>>>>>>>>>> 100
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> 200. This is not ideal but does not violate EOS, since
> >>>>> the
> >>>>>>>>>>> changelogs
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>> all overwrites anyways.
> >>>>>>>>>>>>>>> * If we crash between line 3 and 4, then at that time
> >>>>> the
> >>>>>>>>> local
> >>>>>>>>>>>>>> persistent
> >>>>>>>>>>>>>>> store image is representing as of offset 200, but upon
> >>>>>>>>> recovery
> >>>>>>>>>> all
> >>>>>>>>>>>>>>> changelog records from 100 to log-end-offset would be
> >>>>>>>>> considered
> >>>>>>>>>> as
> >>>>>>>>>>>>>> aborted
> >>>>>>>>>>>>>>> and not be replayed and we would restart processing
> >>>>> from
> >>>>>>>>> position
> >>>>>>>>>>>> 100.
> >>>>>>>>>>>>>>> Restart processing will violate EOS.I'm not sure how
> >>>>> e.g.
> >>>>>>>>>> RocksDB's
> >>>>>>>>>>>>>>> WriteBatchWithIndex would make sure that the step 4 and
> >>>>>>> step 5
> >>>>>>>>>>> could
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>> done atomically here.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Originally what I was thinking when creating the JIRA
> >>>>>> ticket
> >>>>>>>>> is
> >>>>>>>>>>> that
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>> need to let the state store to provide a transactional
> >>>>> API
> >>>>>>>>> like
> >>>>>>>>>>>> "token
> >>>>>>>>>>>>>>> commit()" used in step 4) above which returns a token,
> >>>>>> that
> >>>>>>>>> e.g.
> >>>>>>>>>> in
> >>>>>>>>>>>> our
> >>>>>>>>>>>>>>> example above indicates offset 200, and that token
> >>>>> would
> >>>>>> be
> >>>>>>>>>> written
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>> part
> >>>>>>>>>>>>>>> of the records in Kafka transaction in step 5). And
> >>>>> upon
> >>>>>>>>> recovery
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>> store would have another API like "rollback(token)"
> >>>>> where
> >>>>>>> the
> >>>>>>>>>> token
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>> from the latest committed txn, and be used to rollback
> >>>>> the
> >>>>>>>>> store
> >>>>>>>>>> to
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> committed image. I think your proposal is different,
> >>>>> and
> >>>>>> it
> >>>>>>>>> seems
> >>>>>>>>>>>> like
> >>>>>>>>>>>>>>> you're proposing we swap step 3) and 4) above, but the
> >>>>>>>>> atomicity
> >>>>>>>>>>>> issue
> >>>>>>>>>>>>>>> still remains since now you may have the store image at
> >>>>>> 100
> >>>>>>>>> but
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> changelog is committed at 200. I'd like to learn more
> >>>>>> about
> >>>>>>>>> the
> >>>>>>>>>>>> details
> >>>>>>>>>>>>>>> on how it resolves such issues.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Anyways, that's just an example to make the point that
> >>>>>> there
> >>>>>>>>> are
> >>>>>>>>>>> lots
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> implementational details which would drive the public
> >>>>> API
> >>>>>>>>> design,
> >>>>>>>>>>> and
> >>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> should probably first do a POC, and come back to
> >>>>> discuss
> >>>>>> the
> >>>>>>>>> KIP.
> >>>>>>>>>>> Let
> >>>>>>>>>>>>> me
> >>>>>>>>>>>>>>> know what you think?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 AM Sagar <
> >>>>>>>>>> sagarmeansoc...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Alexander,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the KIP! This seems like a great proposal.
> >>>>> I
> >>>>>>> have
> >>>>>>>>> the
> >>>>>>>>>>>> same
> >>>>>>>>>>>>>>>> opinion as John on the Configuration part though. I
> >>>>> think
> >>>>>>>>> the 2
> >>>>>>>>>>>> level
> >>>>>>>>>>>>>>>> config and its behaviour based on the
> >>>>> setting/unsetting
> >>>>>> of
> >>>>>>>>> the
> >>>>>>>>>>> flag
> >>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>> confusing to me as well. Since the KIP seems
> >>>>> specifically
> >>>>>>>>>> centred
> >>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>> RocksDB it might be better to add it at the Supplier
> >>>>>> level
> >>>>>>> as
> >>>>>>>>>> John
> >>>>>>>>>>>>>>>> suggested.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On similar lines, this config name =>
> >>>>>>>>>>>>>> *statestore.transactional.mechanism
> >>>>>>>>>>>>>>>> *may
> >>>>>>>>>>>>>>>> also need rethinking as the value assigned to
> >>>>>>>>>>> it(rocksdb_indexbatch)
> >>>>>>>>>>>>>>>> implicitly seems to assume that rocksdb is the only
> >>>>>>>>> statestore
> >>>>>>>>>>> that
> >>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>> Stream supports while that's not the case.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Also, regarding the potential memory pressure that
> >>>>> can be
> >>>>>>>>>>> introduced
> >>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>> WriteBatchIndex, do you think it might make more
> >>>>> sense to
> >>>>>>>>>> include
> >>>>>>>>>>>> some
> >>>>>>>>>>>>>>>> numbers/benchmarks on how much the memory consumption
> >>>>>> might
> >>>>>>>>>>>> increase?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Lastly, the read_uncommitted flag's behaviour on IQ
> >>>>> may
> >>>>>>> need
> >>>>>>>>>> more
> >>>>>>>>>>>>>>>> elaboration.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> These points aside, as I said, this is a great
> >>>>> proposal!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 PM John Roesler <
> >>>>>>>>>>> vvcep...@apache.org>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the KIP, Alex!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'm really happy to see your proposal. This
> >>>>> improvement
> >>>>>>>>> fills a
> >>>>>>>>>>>>>>>>> long-standing gap.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I have a few questions:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. Configuration
> >>>>>>>>>>>>>>>>> The KIP only mentions RocksDB, but of course, Streams
> >>>>>> also
> >>>>>>>>>> ships
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>> InMemory store, and users also plug in their own
> >>>>> custom
> >>>>>>>>> state
> >>>>>>>>>>>> stores.
> >>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> also common to use multiple types of state stores in
> >>>>> the
> >>>>>>>>> same
> >>>>>>>>>>>>>> application
> >>>>>>>>>>>>>>>>> for different purposes.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Against this backdrop, the choice to configure
> >>>>>>>>> transactionality
> >>>>>>>>>>> as
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> top-level config, as well as to configure the store
> >>>>>>>>> transaction
> >>>>>>>>>>>>>> mechanism
> >>>>>>>>>>>>>>>>> as a top-level config, seems a bit off.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Did you consider instead just adding the option to
> >>>>> the
> >>>>>>>>>>>>>>>>> RocksDB*StoreSupplier classes and the factories in
> >>>>>> Stores
> >>>>>>> ?
> >>>>>>>>> It
> >>>>>>>>>>>> seems
> >>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>> the desire to enable the feature by default, but
> >>>>> with a
> >>>>>>>>>>>> feature-flag
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> disable it was a factor here. However, as you pointed
> >>>>>> out,
> >>>>>>>>>> there
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>> major considerations that users should be aware of,
> >>>>> so
> >>>>>>>>> opt-in
> >>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>> seem
> >>>>>>>>>>>>>>>>> like a bad choice, either. You could add an Enum
> >>>>>> argument
> >>>>>>> to
> >>>>>>>>>>> those
> >>>>>>>>>>>>>>>>> factories like `RocksDBTransactionalMechanism.{NONE,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Some points in favor of this approach:
> >>>>>>>>>>>>>>>>> * Avoid "stores that don't support transactions
> >>>>> ignore
> >>>>>> the
> >>>>>>>>>>> config"
> >>>>>>>>>>>>>>>>> complexity
> >>>>>>>>>>>>>>>>> * Users can choose how to spend their memory budget,
> >>>>>>> making
> >>>>>>>>>> some
> >>>>>>>>>>>>> stores
> >>>>>>>>>>>>>>>>> transactional and others not
> >>>>>>>>>>>>>>>>> * When we add transactional support to in-memory
> >>>>> stores,
> >>>>>>> we
> >>>>>>>>>> don't
> >>>>>>>>>>>>> have
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> figure out what to do with the mechanism config
> >>>>> (i.e.,
> >>>>>>> what
> >>>>>>>>> do
> >>>>>>>>>>> you
> >>>>>>>>>>>>> set
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> mechanism to when there are multiple kinds of
> >>>>>>> transactional
> >>>>>>>>>>> stores
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> topology?)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2. caching/flushing/transactions
> >>>>>>>>>>>>>>>>> The coupling between memory usage and flushing that
> >>>>> you
> >>>>>>>>>> mentioned
> >>>>>>>>>>>> is
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>> troubling. It also occurs to me that there seems to
> >>>>> be
> >>>>>>> some
> >>>>>>>>>>>>>> relationship
> >>>>>>>>>>>>>>>>> with the existing record cache, which is also an
> >>>>>> in-memory
> >>>>>>>>>>> holding
> >>>>>>>>>>>>> area
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> records that are not yet written to the cache and/or
> >>>>>> store
> >>>>>>>>>>> (albeit
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>> particular semantics). Have you considered how all
> >>>>> these
> >>>>>>>>>>> components
> >>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>> relate? For example, should a "full" WriteBatch
> >>>>> actually
> >>>>>>>>>> trigger
> >>>>>>>>>>> a
> >>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>> that we don't get OOMEs? If the proposed
> >>>>> transactional
> >>>>>>>>>> mechanism
> >>>>>>>>>>>>> forces
> >>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>> uncommitted writes to be buffered in memory, until a
> >>>>>>> commit,
> >>>>>>>>>> then
> >>>>>>>>>>>>> what
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> the advantage over just doing the same thing with the
> >>>>>>>>>> RecordCache
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>> introducing the WriteBatch at all?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. ALOS
> >>>>>>>>>>>>>>>>> You mentioned that a transactional store can help
> >>>>> reduce
> >>>>>>>>>>>> duplication
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the case of ALOS. We might want to be careful about
> >>>>>> claims
> >>>>>>>>> like
> >>>>>>>>>>>> that.
> >>>>>>>>>>>>>>>>> Duplication isn't the way that repeated processing
> >>>>>>>>> manifests in
> >>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>> stores. Rather, it is in the form of dirty reads
> >>>>> during
> >>>>>>>>>>>> reprocessing.
> >>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>> feature may reduce the incidence of dirty reads
> >>>>> during
> >>>>>>>>>>>> reprocessing,
> >>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>> not in a predictable way. During regular processing
> >>>>>> today,
> >>>>>>>>> we
> >>>>>>>>>>> will
> >>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>> some records through to the changelog in between
> >>>>> commit
> >>>>>>>>>>> intervals.
> >>>>>>>>>>>>>> Under
> >>>>>>>>>>>>>>>>> ALOS, if any of those dirty writes gets committed to
> >>>>> the
> >>>>>>>>>>> changelog
> >>>>>>>>>>>>>> topic,
> >>>>>>>>>>>>>>>>> then upon failure, we have to roll the store forward
> >>>>> to
> >>>>>>> them
> >>>>>>>>>>>> anyway,
> >>>>>>>>>>>>>>>>> regardless of this new transactional mechanism.
> >>>>> That's a
> >>>>>>>>>> fixable
> >>>>>>>>>>>>>> problem,
> >>>>>>>>>>>>>>>>> by the way, but this KIP doesn't seem to fix it. I
> >>>>>> wonder
> >>>>>>>>> if we
> >>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>> any claims about the relationship of this feature to
> >>>>>> ALOS
> >>>>>>> if
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> real-world
> >>>>>>>>>>>>>>>>> behavior is so complex.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 4. IQ
> >>>>>>>>>>>>>>>>> As a reminder, we have a new IQv2 mechanism now.
> >>>>> Should
> >>>>>> we
> >>>>>>>>>>> propose
> >>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>> changes to IQv1 to support this transactional
> >>>>> mechanism,
> >>>>>>>>> versus
> >>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>> proposing it for IQv2? Certainly, it seems strange
> >>>>> only
> >>>>>> to
> >>>>>>>>>>> propose
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>> for IQv1 and not v2.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regarding your proposal for IQv1, I'm unsure what the
> >>>>>>>>> behavior
> >>>>>>>>>>>> should
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> for readCommitted, since the current behavior also
> >>>>> reads
> >>>>>>>>> out of
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> RecordCache. I guess if readCommitted==false, then we
> >>>>>> will
> >>>>>>>>>>> continue
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>> from the cache first, then the Batch, then the store;
> >>>>>> and
> >>>>>>> if
> >>>>>>>>>>>>>>>>> readCommitted==true, we would skip the cache and the
> >>>>>> Batch
> >>>>>>>>> and
> >>>>>>>>>>> only
> >>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>> from the persistent RocksDB store?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> What should IQ do if I request to readCommitted on a
> >>>>>>>>>>>>> non-transactional
> >>>>>>>>>>>>>>>>> store?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks again for proposing the KIP, and my apologies
> >>>>> for
> >>>>>>> the
> >>>>>>>>>> long
> >>>>>>>>>>>>>> reply;
> >>>>>>>>>>>>>>>>> I'm hoping to air all my concerns in one "batch" to
> >>>>> save
> >>>>>>>>> time
> >>>>>>>>>> for
> >>>>>>>>>>>>> you.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov
> >>>>>>> wrote:
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I've written a KIP for making Kafka Streams state
> >>>>>> stores
> >>>>>>>>>>>>> transactional
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> would like to start a discussion:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Alex
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [image: Confluent] <https://www.confluent.io>
> >>>>>>>>>>>>> Suhas Satish
> >>>>>>>>>>>>> Engineering Manager
> >>>>>>>>>>>>> Follow us: [image: Blog]
> >>>>>>>>>>>>> <
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> >>>>>>>>>>>>>> [image:
> >>>>>>>>>>>>> Twitter] <https://twitter.com/ConfluentInc>[image:
> >>>>> LinkedIn]
> >>>>>>>>>>>>> <https://www.linkedin.com/company/confluent/>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [image: Try Confluent Cloud for Free]
> >>>>>>>>>>>>> <
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to