Hello Alex,

Thanks for the replies. Regarding the global config v.s. per-store spec, I
agree with John's early comments to some degrees, but I think we may well
distinguish a couple scenarios here. In sum we are discussing about the
following levels of per-store spec:

* Materialized#transactional()
* StoreSupplier#transactional()
* StateStore#transactional()
* Stores.persistentTransactionalKeyValueStore()...

And my thoughts are the following:

* In the current proposal users could specify transactional as either
"Materialized.as("storeName").withTransantionsEnabled()" or
"Materialized.as(Stores.persistentTransactionalKeyValueStore(..))", which
seems not necessary to me. In general, the more options the library
provides, the messier for users to learn the new APIs.

* When using built-in stores, users would usually go with
Materialized.as("storeName"). In such cases I feel it's not very meaningful
to specify "some of the built-in stores to be transactional, while others
be non transactional": as long as one of your stores are non-transactional,
you'd still pay for large restoration cost upon unclean failure. People
may, indeed, want to specify if different transactional mechanisms to be
used across stores; but for whether or not the stores should be
transactional, I feel it's really an "all or none" answer, and our built-in
form (rocksDB) should support transactionality for all store types.

* When using customized stores, users would usually go with
Materialized.as(StoreSupplier). And it's possible if users would choose
some to be transactional while others non-transactional (e.g. if their
customized store only supports transactional for some store types, but not
others).

* At a per-store level, the library do not really care, or need to know
whether that store is transactional or not at runtime, except for
compatibility reasons today we want to make sure the written checkpoint
files do not include those non-transactional stores. But this check would
eventually go away as one day we would always checkpoint files.

---------------------------

With all of that in mind, my gut feeling is that:

* Materialized#transactional(): we would not need this knob, since for
built-in stores I think just a global config should be sufficient (see
below), while for customized store users would need to specify that via the
StoreSupplier anyways and not through this API. Hence I think for either
case we do not need to expose such a knob on the Materialized level.

* Stores.persistentTransactionalKeyValueStore(): I think we could refactor
that function without introducing new constructors in the Stores factory,
but just add new overloads to the existing func name e.g.

```
persistentKeyValueStore(final String name, final boolean transactional)
```

Plus we can augment the storeImplType as introduced in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
as a syntax sugar for users, e.g.

```
public enum StoreImplType {
    ROCKS_DB,
    TXN_ROCKS_DB,
    IN_MEMORY
  }
```

```
stream.groupByKey().count(Materialized.withStoreType(StoreImplType.TXN_
ROCKS_DB));
```

The above provides this global config at the store impl type level.

* RocksDBTransactionalMechanism: I agree with Bruno that we would better
not expose this knob to users, but rather keep it purely as an impl detail
abstracted from the "TXN_ROCKS_DB" type. Over time we may, e.g. use
in-memory stores as the secondary stores with optional spill-to-disks when
we hit the memory limit, but all of that optimizations in the future should
be kept away from the users.

* StoreSupplier#transactional() / StateStore#transactional(): the first
flag is only used to be passed into the StateStore layer, for indicating if
we should write checkpoints; we could mark it as @evolving so that we can
one day remove it without a long deprecation period.


Guozhang








On Wed, Aug 10, 2022 at 8:04 AM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

> Hey Guozhang, Bruno,
>
> Thank you for your feedback. I am going to respond to both of you in a
> single email. I hope it is okay.
>
> @Guozhang,
>
> We could, instead, have a global
> > config to specify if the built-in stores should be transactional or not.
>
>
> This was the original approach I took in this proposal. Earlier in this
> thread John, Sagar, and Bruno listed a number of issues with it. I tend to
> agree with them that it is probably better user experience to control
> transactionality via Materialized objects.
>
> We could simplify our implementation for `commit`
>
> Agreed! I updated the prototype and removed references to the commit marker
> and rolling forward from the proposal.
>
>
> @Bruno,
>
> So, I would remove the details about the 2-state-store implementation
> > from the KIP or provide it as an example of a possible implementation at
> > the end of the KIP.
> >
> I moved the section about the 2-state-store implementation to the bottom of
> the proposal and always mention it as a reference implementation. Please
> let me know if this is okay.
>
> Could you please describe the usage of commit() and recover() in the
> > commit workflow in the KIP as we did in this thread but independently
> > from the state store implementation?
>
> I described how commit/recover change the workflow in the Overview section.
>
> Best,
> Alex
>
> On Wed, Aug 10, 2022 at 10:07 AM Bruno Cadonna <cado...@apache.org> wrote:
>
> > Hi Alex,
> >
> > Thank a lot for explaining!
> >
> > Now some aspects are clearer to me.
> >
> > While I understand now, how the state store can roll forward, I have the
> > feeling that rolling forward is specific to the 2-state-store
> > implementation with RocksDB of your PoC. Other state store
> > implementations might use a different strategy to react to crashes. For
> > example, they might apply an atomic write and effectively rollback if
> > they crash before committing the state store transaction. I think the
> > KIP should not contain such implementation details but provide an
> > interface to accommodate rolling forward and rolling backward.
> >
> > So, I would remove the details about the 2-state-store implementation
> > from the KIP or provide it as an example of a possible implementation at
> > the end of the KIP.
> >
> > Since a state store implementation can roll forward or roll back, I
> > think it is fine to return the changelog offset from recover(). With the
> > returned changelog offset, Streams knows from where to start state store
> > restoration.
> >
> > Could you please describe the usage of commit() and recover() in the
> > commit workflow in the KIP as we did in this thread but independently
> > from the state store implementation? That would make things clearer.
> > Additionally, descriptions of failure scenarios would also be helpful.
> >
> > Best,
> > Bruno
> >
> >
> > On 04.08.22 16:39, Alexander Sorokoumov wrote:
> > > Hey Bruno,
> > >
> > > Thank you for the suggestions and the clarifying questions. I believe
> > that
> > > they cover the core of this proposal, so it is crucial for us to be on
> > the
> > > same page.
> > >
> > > 1. Don't you want to deprecate StateStore#flush().
> > >
> > >
> > > Good call! I updated both the proposal and the prototype.
> > >
> > >   2. I would shorten Materialized#withTransactionalityEnabled() to
> > >> Materialized#withTransactionsEnabled().
> > >
> > >
> > > Turns out, these methods are no longer necessary. I removed them from
> the
> > > proposal and the prototype.
> > >
> > >
> > >> 3. Could you also describe a bit more in detail where the offsets
> passed
> > >> into commit() and recover() come from?
> > >
> > >
> > > The offset passed into StateStore#commit is the last offset committed
> to
> > > the changelog topic. The offset passed into StateStore#recover is the
> > last
> > > checkpointed offset for the given StateStore. Let's look at steps 3
> and 4
> > > in the commit workflow. After the TaskExecutor/TaskManager commits, it
> > calls
> > > StreamTask#postCommit[1] that in turn:
> > > a. updates the changelog offsets via
> > > ProcessorStateManager#updateChangelogOffsets[2]. The offsets here come
> > from
> > > the RecordCollector[3], which tracks the latest offsets the producer
> sent
> > > without exception[4, 5].
> > > b. flushes/commits the state store in AbstractTask#maybeCheckpoint[6].
> > This
> > > method essentially calls ProcessorStateManager methods -
> flush/commit[7]
> > > and checkpoint[8]. ProcessorStateManager#commit goes over all state
> > stores
> > > that belong to that task and commits them with the offset obtained in
> > step
> > > `a`. ProcessorStateManager#checkpoint writes down those offsets for all
> > > state stores, except for non-transactional ones in the case of EOS.
> > >
> > > During initialization, StreamTask calls
> > > StateManagerUtil#registerStateStores[8] that in turn calls
> > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9]. At the
> > > moment, this method assigns checkpointed offsets to the corresponding
> > state
> > > stores[10]. The prototype also calls StateStore#recover with the
> > > checkpointed offset and assigns the offset returned by recover()[11].
> > >
> > > 4. I do not quite understand how a state store can roll forward. You
> > >> mention in the thread the following:
> > >
> > >
> > > The 2-state-stores commit looks like this [12]:
> > >
> > >     1. Flush the temporary state store.
> > >     2. Create a commit marker with a changelog offset corresponding to
> > the
> > >     state we are committing.
> > >     3. Go over all keys in the temporary store and write them down to
> the
> > >     main one.
> > >     4. Wipe the temporary store.
> > >     5. Delete the commit marker.
> > >
> > >
> > > Let's consider crash failure scenarios:
> > >
> > >     - Crash failure happens between steps 1 and 2. The main state store
> > is
> > >     in a consistent state that corresponds to the previously
> checkpointed
> > >     offset. StateStore#recover throws away the temporary store and
> > proceeds
> > >     from the last checkpointed offset.
> > >     - Crash failure happens between steps 2 and 3. We do not know what
> > keys
> > >     from the temporary store were already written to the main store, so
> > we
> > >     can't roll back. There are two options - either wipe the main store
> > or roll
> > >     forward. Since the point of this proposal is to avoid situations
> > where we
> > >     throw away the state and we do not care to what consistent state
> the
> > store
> > >     rolls to, we roll forward by continuing from step 3.
> > >     - Crash failure happens between steps 3 and 4. We can't distinguish
> > >     between this and the previous scenario, so we write all the keys
> > from the
> > >     temporary store. This is okay because the operation is idempotent.
> > >     - Crash failure happens between steps 4 and 5. Again, we can't
> > >     distinguish between this and previous scenarios, but the temporary
> > store is
> > >     already empty. Even though we write all keys from the temporary
> > store, this
> > >     operation is, in fact, no-op.
> > >     - Crash failure happens between step 5 and checkpoint. This is the
> > case
> > >     you referred to in question 5. The commit is finished, but it is
> not
> > >     reflected at the checkpoint. recover() returns the offset of the
> > previous
> > >     commit here, which is incorrect, but it is okay because we will
> > replay the
> > >     changelog from the previously committed offset. As changelog replay
> > is
> > >     idempotent, the state store recovers into a consistent state.
> > >
> > > The last crash failure scenario is a natural transition to
> > >
> > > how should Streams know what to write into the checkpoint file
> > >> after the crash?
> > >>
> > >
> > > As mentioned above, the Streams app writes the checkpoint file after
> the
> > > Kafka transaction and then the StateStore commit. Same as without the
> > > proposal, it should write the committed offset, as it is the same for
> > both
> > > the Kafka changelog and the state store.
> > >
> > >
> > >> This issue arises because we store the offset outside of the state
> > >> store. Maybe we need an additional method on the state store interface
> > >> that returns the offset at which the state store is.
> > >
> > >
> > > In my opinion, we should include in the interface only the guarantees
> > that
> > > are necessary to preserve EOS without wiping the local state. This way,
> > we
> > > allow more room for possible implementations. Thanks to the idempotency
> > of
> > > the changelog replay, it is "good enough" if StateStore#recover returns
> > the
> > > offset that is less than what it actually is. The only limitation here
> is
> > > that the state store should never commit writes that are not yet
> > committed
> > > in Kafka changelog.
> > >
> > > Please let me know what you think about this. First of all, I am
> > relatively
> > > new to the codebase, so I might be wrong in my understanding of
> > > how it works. Second, while writing this, it occured to me that the
> > > StateStore#recover interface method is not straightforward as it can
> be.
> > > Maybe we can change it like that:
> > >
> > > /**
> > >      * Recover a transactional state store
> > >      * <p>
> > >      * If a transactional state store shut down with a crash failure,
> > this
> > > method ensures that the
> > >      * state store is in a consistent state that corresponds to {@code
> > > changelofOffset} or later.
> > >      *
> > >      * @param changelogOffset the checkpointed changelog offset.
> > >      * @return {@code true} if recovery succeeded, {@code false}
> > otherwise.
> > >      */
> > > boolean recover(final Long changelogOffset) {
> > >
> > > Note: all links below except for [10] lead to the prototype's code.
> > > 1.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L468
> > > 2.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L580
> > > 3.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L868
> > > 4.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L94-L96
> > > 5.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L213-L216
> > > 6.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L94-L97
> > > 7.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L469
> > > 8.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L226
> > > 9.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L103
> > > 10.
> > >
> >
> https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L251-L252
> > > 11.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L250-L265
> > > 12.
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java#L84-L88
> > >
> > > Best,
> > > Alex
> > >
> > > On Fri, Jul 29, 2022 at 3:42 PM Bruno Cadonna <cado...@apache.org>
> > wrote:
> > >
> > >> Hi Alex,
> > >>
> > >> Thanks for the updates!
> > >>
> > >> 1. Don't you want to deprecate StateStore#flush(). As far as I
> > >> understand, commit() is the new flush(), right? If you do not
> deprecate
> > >> it, you don't get rid of the error room you describe in your KIP by
> > >> having a flush() and a commit().
> > >>
> > >>
> > >> 2. I would shorten Materialized#withTransactionalityEnabled() to
> > >> Materialized#withTransactionsEnabled().
> > >>
> > >>
> > >> 3. Could you also describe a bit more in detail where the offsets
> passed
> > >> into commit() and recover() come from?
> > >>
> > >>
> > >> For my next two points, I need the commit workflow that you were so
> kind
> > >> to post into this thread:
> > >>
> > >> 1. write stuff to the state store
> > >> 2. producer.sendOffsetsToTransaction(token);
> > producer.commitTransaction();
> > >> 3. flush (<- that would be call to commit(), right?)
> > >> 4. checkpoint
> > >>
> > >>
> > >> 4. I do not quite understand how a state store can roll forward. You
> > >> mention in the thread the following:
> > >>
> > >> "If the crash failure happens during #3, the state store can roll
> > >> forward and finish the flush/commit."
> > >>
> > >> How does the state store know where it stopped the flushing when it
> > >> crashed?
> > >>
> > >> This seems an optimization to me. I think in general the state store
> > >> should rollback to the last successfully committed state and restore
> > >> from there until the end of the changelog topic partition. The last
> > >> committed state is the offsets in the checkpoint file.
> > >>
> > >>
> > >> 5. In the same e-mail from point 4, you also state:
> > >>
> > >> "If the crash failure happens between #3 and #4, the state store
> should
> > >> do nothing during recovery and just proceed with the checkpoint."
> > >>
> > >> How should Streams know that the failure was between #3 and #4 during
> > >> recovery? It just sees a valid state store and a valid checkpoint
> file.
> > >> Streams does not know that the state of the checkpoint file does not
> > >> match with the committed state of the state store.
> > >> Also, how should Streams know what to write into the checkpoint file
> > >> after the crash?
> > >> This issue arises because we store the offset outside of the state
> > >> store. Maybe we need an additional method on the state store interface
> > >> that returns the offset at which the state store is.
> > >>
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >>
> > >>
> > >>
> > >> On 27.07.22 11:51, Alexander Sorokoumov wrote:
> > >>> Hey Nick,
> > >>>
> > >>> Thank you for the kind words and the feedback! I'll definitely add an
> > >>> option to configure the transactional mechanism in Stores factory
> > method
> > >>> via an argument as John previously suggested and might add the
> > in-memory
> > >>> option via RocksDB Indexed Batches if I figure why their creation via
> > >>> rocksdb jni fails with `UnsatisfiedLinkException`.
> > >>>
> > >>> Best,
> > >>> Alex
> > >>>
> > >>> On Wed, Jul 27, 2022 at 11:46 AM Alexander Sorokoumov <
> > >>> asorokou...@confluent.io> wrote:
> > >>>
> > >>>> Hey Guozhang,
> > >>>>
> > >>>> 1) About the param passed into the `recover()` function: it seems to
> > me
> > >>>>> that the semantics of "recover(offset)" is: recover this state to a
> > >>>>> transaction boundary which is at least the passed-in offset. And
> the
> > >> only
> > >>>>> possibility that the returned offset is different than the
> passed-in
> > >>>>> offset
> > >>>>> is that if the previous failure happens after we've done all the
> > commit
> > >>>>> procedures except writing the new checkpoint, in which case the
> > >> returned
> > >>>>> offset would be larger than the passed-in offset. Otherwise it
> should
> > >>>>> always be equal to the passed-in offset, is that right?
> > >>>>
> > >>>>
> > >>>> Right now, the only case when `recover` returns an offset different
> > from
> > >>>> the passed one is when the failure happens *during* commit.
> > >>>>
> > >>>>
> > >>>> If the failure happens after commit but before the checkpoint,
> > `recover`
> > >>>> might return either a passed or newer committed offset, depending on
> > the
> > >>>> implementation. The `recover` implementation in the prototype
> returns
> > a
> > >>>> passed offset because it deletes the commit marker that holds that
> > >> offset
> > >>>> after the commit is done. In that case, the store will replay the
> last
> > >>>> commit from the changelog. I think it is fine as the changelog
> replay
> > is
> > >>>> idempotent.
> > >>>>
> > >>>> 2) It seems the only use for the "transactional()" function is to
> > >> determine
> > >>>>> if we can update the checkpoint file while in EOS.
> > >>>>
> > >>>>
> > >>>> Right now, there are 2 other uses for `transactional()`:
> > >>>> 1. To determine what to do during initialization if the checkpoint
> is
> > >> gone
> > >>>> (see [1]). If the state store is transactional, we don't have to
> wipe
> > >> the
> > >>>> existing data. Thinking about it now, we do not really need this
> check
> > >>>> whether the store is `transactional` because if it is not, we'd not
> > have
> > >>>> written the checkpoint in the first place. I am going to remove that
> > >> check.
> > >>>> 2. To determine if the persistent kv store in KStreamImplJoin should
> > be
> > >>>> transactional (see [2], [3]).
> > >>>>
> > >>>> I am not sure if we can get rid of the checks in point 2. If so, I'd
> > be
> > >>>> happy to encapsulate `transactional()` logic in `commit/recover`.
> > >>>>
> > >>>> Best,
> > >>>> Alex
> > >>>>
> > >>>> 1.
> > >>>>
> > >>
> >
> https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281
> > >>>> 2.
> > >>>>
> > >>
> >
> https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278
> > >>>> 3.
> > >>>>
> > >>
> >
> https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354
> > >>>>
> > >>>> On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <
> nick.telf...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Alex,
> > >>>>>
> > >>>>> Excellent proposal, I'm very keen to see this land!
> > >>>>>
> > >>>>> Would it be useful to permit configuring the type of store used for
> > >>>>> uncommitted offsets on a store-by-store basis? This way, users
> could
> > >>>>> choose
> > >>>>> whether to use, e.g. an in-memory store or RocksDB, potentially
> > >> reducing
> > >>>>> the overheads associated with RocksDb for smaller stores, but
> without
> > >> the
> > >>>>> memory pressure issues?
> > >>>>>
> > >>>>> I suspect that in most cases, the number of uncommitted records
> will
> > be
> > >>>>> very small, because the default commit interval is 100ms.
> > >>>>>
> > >>>>> Regards,
> > >>>>>
> > >>>>> Nick
> > >>>>>
> > >>>>> On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>>>>
> > >>>>>> Hello Alex,
> > >>>>>>
> > >>>>>> Thanks for the updated KIP, I looked over it and browsed the WIP
> and
> > >>>>> just
> > >>>>>> have a couple meta thoughts:
> > >>>>>>
> > >>>>>> 1) About the param passed into the `recover()` function: it seems
> to
> > >> me
> > >>>>>> that the semantics of "recover(offset)" is: recover this state to
> a
> > >>>>>> transaction boundary which is at least the passed-in offset. And
> the
> > >>>>> only
> > >>>>>> possibility that the returned offset is different than the
> passed-in
> > >>>>> offset
> > >>>>>> is that if the previous failure happens after we've done all the
> > >> commit
> > >>>>>> procedures except writing the new checkpoint, in which case the
> > >> returned
> > >>>>>> offset would be larger than the passed-in offset. Otherwise it
> > should
> > >>>>>> always be equal to the passed-in offset, is that right?
> > >>>>>>
> > >>>>>> 2) It seems the only use for the "transactional()" function is to
> > >>>>> determine
> > >>>>>> if we can update the checkpoint file while in EOS. But the purpose
> > of
> > >>>>> the
> > >>>>>> checkpoint file's offsets is just to tell "the local state's
> current
> > >>>>>> snapshot's progress is at least the indicated offsets" anyways,
> and
> > >> with
> > >>>>>> this KIP maybe we would just do:
> > >>>>>>
> > >>>>>> a) when in ALOS, upon failover: we set the starting offset as
> > >>>>>> checkpointed-offset, then restore() from changelog till the
> > >> end-offset.
> > >>>>>> This way we may restore some records twice.
> > >>>>>> b) when in EOS, upon failover: we first call
> > >>>>> recover(checkpointed-offset),
> > >>>>>> then set the starting offset as the returned offset (which may be
> > >> larger
> > >>>>>> than checkpointed-offset), then restore until the end-offset.
> > >>>>>>
> > >>>>>> So why not also:
> > >>>>>> c) we let the `commit()` function to also return an offset, which
> > >>>>> indicates
> > >>>>>> "checkpointable offsets".
> > >>>>>> d) for existing non-transactional stores, we just have a default
> > >>>>>> implementation of "commit()" which is simply a flush, and returns
> a
> > >>>>>> sentinel value like -1. Then later if we get checkpointable
> offsets
> > >> -1,
> > >>>>> we
> > >>>>>> do not write the checkpoint. Upon clean shutting down we can just
> > >>>>>> checkpoint regardless of the returned value from "commit".
> > >>>>>> e) for existing non-transactional stores, we just have a default
> > >>>>>> implementation of "recover()" which is to wipe out the local store
> > and
> > >>>>>> return offset 0 if the passed in offset is -1, otherwise if not -1
> > >> then
> > >>>>> it
> > >>>>>> indicates a clean shutdown in the last run, can this function is
> > just
> > >> a
> > >>>>>> no-op.
> > >>>>>>
> > >>>>>> In that case, we would not need the "transactional()" function
> > >> anymore,
> > >>>>>> since for non-transactional stores their behaviors are still
> wrapped
> > >> in
> > >>>>> the
> > >>>>>> `commit / recover` function pairs.
> > >>>>>>
> > >>>>>> I have not completed the thorough pass on your WIP PR, so maybe I
> > >> could
> > >>>>>> come up with some more feedback later, but just let me know if my
> > >>>>>> understanding above is correct or not?
> > >>>>>>
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov
> > >>>>>> <asorokou...@confluent.io.invalid> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> I updated the KIP with the following changes:
> > >>>>>>> * Replaced in-memory batches with the secondary-store approach as
> > the
> > >>>>>>> default implementation to address the feedback about memory
> > pressure
> > >>>>> as
> > >>>>>>> suggested by Sagar and Bruno.
> > >>>>>>> * Introduced StateStore#commit and StateStore#recover methods as
> an
> > >>>>>>> extension of the rollback idea. @Guozhang, please see the comment
> > >>>>> below
> > >>>>>> on
> > >>>>>>> why I took a slightly different approach than you suggested.
> > >>>>>>> * Removed mentions of changes to IQv1 and IQv2. Transactional
> state
> > >>>>>> stores
> > >>>>>>> enable reading committed in IQ, but it is really an independent
> > >>>>> feature
> > >>>>>>> that deserves its own KIP. Conflating them unnecessarily
> increases
> > >> the
> > >>>>>>> scope for discussion, implementation, and testing in a single
> unit
> > of
> > >>>>>> work.
> > >>>>>>>
> > >>>>>>> I also published a prototype -
> > >>>>>> https://github.com/apache/kafka/pull/12393
> > >>>>>>> that implements changes described in the proposal.
> > >>>>>>>
> > >>>>>>> Regarding explicit rollback, I think it is a powerful idea that
> > >> allows
> > >>>>>>> other StateStore implementations to take a different path to the
> > >>>>>>> transactional behavior rather than keep 2 state stores. Instead
> of
> > >>>>>>> introducing a new commit token, I suggest using a changelog
> offset
> > >>>>> that
> > >>>>>>> already 1:1 corresponds to the materialized state. This works
> > nicely
> > >>>>>>> because Kafka Stream first commits an AK transaction and only
> then
> > >>>>>>> checkpoints the state store, so we can use the changelog offset
> to
> > >>>>> commit
> > >>>>>>> the state store transaction.
> > >>>>>>>
> > >>>>>>> I called the method StateStore#recover rather than
> > >> StateStore#rollback
> > >>>>>>> because a state store might either roll back or forward depending
> > on
> > >>>>> the
> > >>>>>>> specific point of the crash failure.Consider the write algorithm
> in
> > >>>>> Kafka
> > >>>>>>> Streams is:
> > >>>>>>> 1. write stuff to the state store
> > >>>>>>> 2. producer.sendOffsetsToTransaction(token);
> > >>>>>> producer.commitTransaction();
> > >>>>>>> 3. flush
> > >>>>>>> 4. checkpoint
> > >>>>>>>
> > >>>>>>> Let's consider 3 cases:
> > >>>>>>> 1. If the crash failure happens between #2 and #3, the state
> store
> > >>>>> rolls
> > >>>>>>> back and replays the uncommitted transaction from the changelog.
> > >>>>>>> 2. If the crash failure happens during #3, the state store can
> roll
> > >>>>>> forward
> > >>>>>>> and finish the flush/commit.
> > >>>>>>> 3. If the crash failure happens between #3 and #4, the state
> store
> > >>>>> should
> > >>>>>>> do nothing during recovery and just proceed with the checkpoint.
> > >>>>>>>
> > >>>>>>> Looking forward to your feedback,
> > >>>>>>> Alexander
> > >>>>>>>
> > >>>>>>> On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov <
> > >>>>>>> asorokou...@confluent.io> wrote:
> > >>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>>
> > >>>>>>>> As a status update, I did the following changes to the KIP:
> > >>>>>>>> * replaced configuration via the top-level config with
> > configuration
> > >>>>>> via
> > >>>>>>>> Stores factory and StoreSuppliers,
> > >>>>>>>> * added IQv2 and elaborated how readCommitted will work when the
> > >>>>> store
> > >>>>>> is
> > >>>>>>>> not transactional,
> > >>>>>>>> * removed claims about ALOS.
> > >>>>>>>>
> > >>>>>>>> I am going to be OOO in the next couple of weeks and will resume
> > >>>>>> working
> > >>>>>>>> on the proposal and responding to the discussion in this thread
> > >>>>>> starting
> > >>>>>>>> June 27. My next top priorities are:
> > >>>>>>>> 1. Prototype the rollback approach as suggested by Guozhang.
> > >>>>>>>> 2. Replace in-memory batches with the secondary-store approach
> as
> > >>>>> the
> > >>>>>>>> default implementation to address the feedback about memory
> > >>>>> pressure as
> > >>>>>>>> suggested by Sagar and Bruno.
> > >>>>>>>> 3. Adjust Stores methods to make transactional implementations
> > >>>>>> pluggable.
> > >>>>>>>> 4. Publish the POC for the first review.
> > >>>>>>>>
> > >>>>>>>> Best regards,
> > >>>>>>>> Alex
> > >>>>>>>>
> > >>>>>>>> On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <
> wangg...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Alex,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for your replies! That is very helpful.
> > >>>>>>>>>
> > >>>>>>>>> Just to broaden our discussions a bit here, I think there are
> > some
> > >>>>>> other
> > >>>>>>>>> approaches in parallel to the idea of "enforce to only persist
> > upon
> > >>>>>>>>> explicit flush" and I'd like to throw one here -- not really
> > >>>>>> advocating
> > >>>>>>>>> it,
> > >>>>>>>>> but just for us to compare the pros and cons:
> > >>>>>>>>>
> > >>>>>>>>> 1) We let the StateStore's `flush` function to return a token
> > >>>>> instead
> > >>>>>> of
> > >>>>>>>>> returning `void`.
> > >>>>>>>>> 2) We add another `rollback(token)` interface of StateStore
> which
> > >>>>>> would
> > >>>>>>>>> effectively rollback the state as indicated by the token to the
> > >>>>>> snapshot
> > >>>>>>>>> when the corresponding `flush` is called.
> > >>>>>>>>> 3) We encode the token and commit as part of
> > >>>>>>>>> `producer#sendOffsetsToTransaction`.
> > >>>>>>>>>
> > >>>>>>>>> Users could optionally implement the new functions, or they can
> > >>>>> just
> > >>>>>> not
> > >>>>>>>>> return the token at all and not implement the second function.
> > >>>>> Again,
> > >>>>>>> the
> > >>>>>>>>> APIs are just for the sake of illustration, not feeling they
> are
> > >>>>> the
> > >>>>>>> most
> > >>>>>>>>> natural :)
> > >>>>>>>>>
> > >>>>>>>>> Then the procedure would be:
> > >>>>>>>>>
> > >>>>>>>>> 1. the previous checkpointed offset is 100
> > >>>>>>>>> ...
> > >>>>>>>>> 3. flush store, make sure all writes are persisted; get the
> > >>>>> returned
> > >>>>>>> token
> > >>>>>>>>> that indicates the snapshot of 200.
> > >>>>>>>>> 4. producer.sendOffsetsToTransaction(token);
> > >>>>>>> producer.commitTransaction();
> > >>>>>>>>> 5. Update the checkpoint file (say, the new value is 200).
> > >>>>>>>>>
> > >>>>>>>>> Then if there's a failure, say between 3/4, we would get the
> > token
> > >>>>>> from
> > >>>>>>>>> the
> > >>>>>>>>> last committed txn, and first we would do the restoration
> (which
> > >>>>> may
> > >>>>>> get
> > >>>>>>>>> the state to somewhere between 100 and 200), then call
> > >>>>>>>>> `store.rollback(token)` to rollback to the snapshot of offset
> > 100.
> > >>>>>>>>>
> > >>>>>>>>> The pros is that we would then not need to enforce the state
> > >>>>> stores to
> > >>>>>>> not
> > >>>>>>>>> persist any data during the txn: for stores that may not be
> able
> > to
> > >>>>>>>>> implement the `rollback` function, they can still reduce its
> impl
> > >>>>> to
> > >>>>>>> "not
> > >>>>>>>>> persisting any data" via this API, but for stores that can
> indeed
> > >>>>>>> support
> > >>>>>>>>> the rollback, their implementation may be more efficient. The
> > cons
> > >>>>>>> though,
> > >>>>>>>>> on top of my head are 1) more complicated logic differentiating
> > >>>>>> between
> > >>>>>>>>> EOS
> > >>>>>>>>> with and without store rollback support, and ALOS, 2) encoding
> > the
> > >>>>>> token
> > >>>>>>>>> as
> > >>>>>>>>> part of the commit offset is not ideal if it is big, 3) the
> > >>>>> recovery
> > >>>>>>> logic
> > >>>>>>>>> including the state store is also a bit more complicated.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Guozhang
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov
> > >>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Guozhang,
> > >>>>>>>>>>
> > >>>>>>>>>> But I'm still trying to clarify how it guarantees EOS, and it
> > >>>>> seems
> > >>>>>>>>> that we
> > >>>>>>>>>>> would achieve it by enforcing to not persist any data written
> > >>>>>> within
> > >>>>>>>>> this
> > >>>>>>>>>>> transaction until step 4. Is that correct?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> This is correct. Both alternatives - in-memory
> > >>>>> WriteBatchWithIndex
> > >>>>>> and
> > >>>>>>>>>> transactionality via the secondary store guarantee EOS by not
> > >>>>>>> persisting
> > >>>>>>>>>> data in the "main" state store until it is committed in the
> > >>>>>> changelog
> > >>>>>>>>>> topic.
> > >>>>>>>>>>
> > >>>>>>>>>> Oh what I meant is not what KStream code does, but that
> > >>>>> StateStore
> > >>>>>>> impl
> > >>>>>>>>>>> classes themselves could potentially flush data to become
> > >>>>>> persisted
> > >>>>>>>>>>> asynchronously
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Thank you for elaborating! You are correct, the underlying
> state
> > >>>>>> store
> > >>>>>>>>>> should not persist data until the streams app calls
> > >>>>>> StateStore#flush.
> > >>>>>>>>> There
> > >>>>>>>>>> are 2 options how a State Store implementation can guarantee
> > >>>>> that -
> > >>>>>>>>> either
> > >>>>>>>>>> keep uncommitted writes in memory or be able to roll back the
> > >>>>>> changes
> > >>>>>>>>> that
> > >>>>>>>>>> were not committed during recovery. RocksDB's
> > >>>>> WriteBatchWithIndex is
> > >>>>>>> an
> > >>>>>>>>>> implementation of the first option. A considered alternative,
> > >>>>>>>>> Transactions
> > >>>>>>>>>> via Secondary State Store for Uncommitted Changes, is the way
> to
> > >>>>>>>>> implement
> > >>>>>>>>>> the second option.
> > >>>>>>>>>>
> > >>>>>>>>>> As everyone correctly pointed out, keeping uncommitted data in
> > >>>>>> memory
> > >>>>>>>>>> introduces a very real risk of OOM that we will need to
> handle.
> > >>>>> The
> > >>>>>>>>> more I
> > >>>>>>>>>> think about it, the more I lean towards going with the
> > >>>>> Transactions
> > >>>>>>> via
> > >>>>>>>>>> Secondary Store as the way to implement transactionality as it
> > >>>>> does
> > >>>>>>> not
> > >>>>>>>>>> have that issue.
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> Alex
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <
> > >>>>> wangg...@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hello Alex,
> > >>>>>>>>>>>
> > >>>>>>>>>>>> we flush the cache, but not the underlying state store.
> > >>>>>>>>>>>
> > >>>>>>>>>>> You're right. The ordering I mentioned above is actually:
> > >>>>>>>>>>>
> > >>>>>>>>>>> ...
> > >>>>>>>>>>> 3. producer.sendOffsetsToTransaction();
> > >>>>>>> producer.commitTransaction();
> > >>>>>>>>>>> 4. flush store, make sure all writes are persisted.
> > >>>>>>>>>>> 5. Update the checkpoint file to 200.
> > >>>>>>>>>>>
> > >>>>>>>>>>> But I'm still trying to clarify how it guarantees EOS, and it
> > >>>>>> seems
> > >>>>>>>>> that
> > >>>>>>>>>> we
> > >>>>>>>>>>> would achieve it by enforcing to not persist any data written
> > >>>>>> within
> > >>>>>>>>> this
> > >>>>>>>>>>> transaction until step 4. Is that correct?
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Can you please point me to the place in the codebase where
> we
> > >>>>>>>>> trigger
> > >>>>>>>>>>> async flush before the commit?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Oh what I meant is not what KStream code does, but that
> > >>>>> StateStore
> > >>>>>>>>> impl
> > >>>>>>>>>>> classes themselves could potentially flush data to become
> > >>>>>> persisted
> > >>>>>>>>>>> asynchronously, e.g. RocksDB does that naturally out of the
> > >>>>>> control
> > >>>>>>> of
> > >>>>>>>>>>> KStream code. I think it is related to my previous question:
> > >>>>> if we
> > >>>>>>>>> think
> > >>>>>>>>>> by
> > >>>>>>>>>>> guaranteeing EOS at the state store level, we would
> effectively
> > >>>>>> ask
> > >>>>>>>>> the
> > >>>>>>>>>>> impl classes that "you should not persist any data until
> > >>>>> `flush`
> > >>>>>> is
> > >>>>>>>>>> called
> > >>>>>>>>>>> explicitly", is the StateStore interface the right level to
> > >>>>>> enforce
> > >>>>>>>>> such
> > >>>>>>>>>>> mechanisms, or should we just do that on top of the
> > >>>>> StateStores,
> > >>>>>>> e.g.
> > >>>>>>>>>>> during the transaction we just keep all the writes in the
> cache
> > >>>>>> (of
> > >>>>>>>>>> course
> > >>>>>>>>>>> we need to consider how to work around memory pressure as
> > >>>>>> previously
> > >>>>>>>>>>> mentioned), and then upon committing, we just write the
> cached
> > >>>>>>> records
> > >>>>>>>>>> as a
> > >>>>>>>>>>> whole into the store and then call flush.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov
> > >>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hey,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thank you for the wealth of great suggestions and questions!
> > >>>>> I
> > >>>>>> am
> > >>>>>>>>> going
> > >>>>>>>>>>> to
> > >>>>>>>>>>>> address the feedback in batches and update the proposal
> > >>>>> async,
> > >>>>>> as
> > >>>>>>>>> it is
> > >>>>>>>>>>>> probably going to be easier for everyone. I will also write
> a
> > >>>>>>>>> separate
> > >>>>>>>>>>>> message after making updates to the KIP.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> @John,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Did you consider instead just adding the option to the
> > >>>>>>>>>>>>> RocksDB*StoreSupplier classes and the factories in Stores ?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thank you for suggesting that. I think that this idea is
> > >>>>> better
> > >>>>>>> than
> > >>>>>>>>>>> what I
> > >>>>>>>>>>>> came up with and will update the KIP with configuring
> > >>>>>>>>> transactionality
> > >>>>>>>>>>> via
> > >>>>>>>>>>>> the suppliers and Stores.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> what is the advantage over just doing the same thing with
> the
> > >>>>>>>>>> RecordCache
> > >>>>>>>>>>>>> and not introducing the WriteBatch at all?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Can you point me to RecordCache? I can't find it in the
> > >>>>> project.
> > >>>>>>> The
> > >>>>>>>>>>>> advantage would be that WriteBatch guarantees write
> > >>>>> atomicity.
> > >>>>>> As
> > >>>>>>>>> far
> > >>>>>>>>>> as
> > >>>>>>>>>>> I
> > >>>>>>>>>>>> understood the way RecordCache works, it might leave the
> > >>>>> system
> > >>>>>> in
> > >>>>>>>>> an
> > >>>>>>>>>>>> inconsistent state during crash failure on write.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> You mentioned that a transactional store can help reduce
> > >>>>>>>>> duplication in
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>> case of ALOS
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I will remove claims about ALOS from the proposal. Thank you
> > >>>>> for
> > >>>>>>>>>>>> elaborating!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> As a reminder, we have a new IQv2 mechanism now. Should we
> > >>>>>> propose
> > >>>>>>>>> any
> > >>>>>>>>>>>>> changes to IQv1 to support this transactional mechanism,
> > >>>>>> versus
> > >>>>>>>>> just
> > >>>>>>>>>>>>> proposing it for IQv2? Certainly, it seems strange only to
> > >>>>>>>>> propose a
> > >>>>>>>>>>>> change
> > >>>>>>>>>>>>> for IQv1 and not v2.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>    I will update the proposal with complementary API changes
> > >>>>> for
> > >>>>>>> IQv2
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> What should IQ do if I request to readCommitted on a
> > >>>>>>>>> non-transactional
> > >>>>>>>>>>>>> store?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> We can assume that non-transactional stores commit on write,
> > >>>>> so
> > >>>>>> IQ
> > >>>>>>>>>> works
> > >>>>>>>>>>> in
> > >>>>>>>>>>>> the same way with non-transactional stores regardless of the
> > >>>>>> value
> > >>>>>>>>> of
> > >>>>>>>>>>>> readCommitted.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>    @Guozhang,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> * If we crash between line 3 and 4, then at that time the
> > >>>>> local
> > >>>>>>>>>>> persistent
> > >>>>>>>>>>>>> store image is representing as of offset 200, but upon
> > >>>>>> recovery
> > >>>>>>>>> all
> > >>>>>>>>>>>>> changelog records from 100 to log-end-offset would be
> > >>>>>> considered
> > >>>>>>>>> as
> > >>>>>>>>>>>> aborted
> > >>>>>>>>>>>>> and not be replayed and we would restart processing from
> > >>>>>>> position
> > >>>>>>>>>> 100.
> > >>>>>>>>>>>>> Restart processing will violate EOS.I'm not sure how e.g.
> > >>>>>>>>> RocksDB's
> > >>>>>>>>>>>>> WriteBatchWithIndex would make sure that the step 4 and
> > >>>>> step 5
> > >>>>>>>>> could
> > >>>>>>>>>> be
> > >>>>>>>>>>>>> done atomically here.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Could you please point me to the place in the codebase where
> > >>>>> a
> > >>>>>>> task
> > >>>>>>>>>>> flushes
> > >>>>>>>>>>>> the store before committing the transaction?
> > >>>>>>>>>>>> Looking at TaskExecutor (
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
> > >>>>>>>>>>>> ),
> > >>>>>>>>>>>> StreamTask#prepareCommit (
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
> > >>>>>>>>>>>> ),
> > >>>>>>>>>>>> and CachedStateStore (
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
> > >>>>>>>>>>>> )
> > >>>>>>>>>>>> we flush the cache, but not the underlying state store.
> > >>>>> Explicit
> > >>>>>>>>>>>> StateStore#flush happens in
> > >>>>> AbstractTask#maybeWriteCheckpoint (
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
> > >>>>>>>>>>>> ).
> > >>>>>>>>>>>> Is there something I am missing here?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Today all cached data that have not been flushed are not
> > >>>>>> committed
> > >>>>>>>>> for
> > >>>>>>>>>>>>> sure, but even flushed data to the persistent underlying
> > >>>>> store
> > >>>>>>> may
> > >>>>>>>>>> also
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>> uncommitted since flushing can be triggered asynchronously
> > >>>>>>> before
> > >>>>>>>>> the
> > >>>>>>>>>>>>> commit.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Can you please point me to the place in the codebase where
> we
> > >>>>>>>>> trigger
> > >>>>>>>>>>> async
> > >>>>>>>>>>>> flush before the commit? This would certainly be a reason to
> > >>>>>>>>> introduce
> > >>>>>>>>>> a
> > >>>>>>>>>>>> dedicated StateStore#commit method.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks again for the feedback. I am going to update the KIP
> > >>>>> and
> > >>>>>>> then
> > >>>>>>>>>>>> respond to the next batch of questions and suggestions.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>> Alex
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Mon, May 30, 2022 at 5:13 PM Suhas Satish
> > >>>>>>>>>>> <ssat...@confluent.io.invalid
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for the KIP proposal Alex.
> > >>>>>>>>>>>>> 1. Configuration default
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> You mention applications using streams DSL with built-in
> > >>>>>> rocksDB
> > >>>>>>>>>> state
> > >>>>>>>>>>>>> store will get transactional state stores by default when
> > >>>>> EOS
> > >>>>>> is
> > >>>>>>>>>>> enabled,
> > >>>>>>>>>>>>> but the default implementation for apps using PAPI will
> > >>>>>> fallback
> > >>>>>>>>> to
> > >>>>>>>>>>>>> non-transactional behavior.
> > >>>>>>>>>>>>> Shouldn't we have the same default behavior for both types
> > >>>>> of
> > >>>>>>>>> apps -
> > >>>>>>>>>>> DSL
> > >>>>>>>>>>>>> and PAPI?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <
> > >>>>>>> cado...@apache.org
> > >>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks for the PR, Alex!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I am also glad to see this coming.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1. Configuration
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I would also prefer to restrict the configuration of
> > >>>>>>>>> transactional
> > >>>>>>>>>> on
> > >>>>>>>>>>>>>> the state sore. Ideally, calling method transactional()
> > >>>>> on
> > >>>>>> the
> > >>>>>>>>>> state
> > >>>>>>>>>>>>>> store would be enough. An option on the store builder
> > >>>>> would
> > >>>>>>>>> make it
> > >>>>>>>>>>>>>> possible to turn transactionality on and off (as John
> > >>>>>>> proposed).
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 2. Memory usage in RocksDB
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> This seems to be a major issue. We do not have any
> > >>>>> guarantee
> > >>>>>>>>> that
> > >>>>>>>>>>>>>> uncommitted writes fit into memory and I guess we will
> > >>>>> never
> > >>>>>>>>> have.
> > >>>>>>>>>>> What
> > >>>>>>>>>>>>>> happens when the uncommitted writes do not fit into
> > >>>>> memory?
> > >>>>>>> Does
> > >>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>> throw an exception? Can we handle such an exception
> > >>>>> without
> > >>>>>>>>>> crashing?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Does the RocksDB behavior even need to be included in
> > >>>>> this
> > >>>>>>> KIP?
> > >>>>>>>>> In
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> end it is an implementation detail.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> What we should consider - though - is a memory limit in
> > >>>>> some
> > >>>>>>>>> form.
> > >>>>>>>>>>> And
> > >>>>>>>>>>>>>> what we do when the memory limit is exceeded.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 3. PoC
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I agree with Guozhang that a PoC is a good idea to better
> > >>>>>>>>>> understand
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> devils in the details.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On 25.05.22 01:52, Guozhang Wang wrote:
> > >>>>>>>>>>>>>>> Hello Alex,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for writing the proposal! Glad to see it
> > >>>>> coming. I
> > >>>>>>>>> think
> > >>>>>>>>>>> this
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> kind of a KIP that since too many devils would be
> > >>>>> buried
> > >>>>>> in
> > >>>>>>>>> the
> > >>>>>>>>>>>> details
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>> it's better to start working on a POC, either in
> > >>>>> parallel,
> > >>>>>>> or
> > >>>>>>>>>>> before
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> resume our discussion, rather than blocking any
> > >>>>>>> implementation
> > >>>>>>>>>>> until
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> satisfied with the proposal.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Just as a concrete example, I personally am still not
> > >>>>> 100%
> > >>>>>>>>> clear
> > >>>>>>>>>>> how
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> proposal would work to achieve EOS with the state
> > >>>>> stores.
> > >>>>>>> For
> > >>>>>>>>>>>> example,
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> commit procedure today looks like this:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 0: there's an existing checkpoint file indicating the
> > >>>>>>>>> changelog
> > >>>>>>>>>>>> offset
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>> the local state store image is 100. Now a commit is
> > >>>>>>> triggered:
> > >>>>>>>>>>>>>>> 1. flush cache (since it contains partially processed
> > >>>>>>>>> records),
> > >>>>>>>>>>> make
> > >>>>>>>>>>>>> sure
> > >>>>>>>>>>>>>>> all records are written to the producer.
> > >>>>>>>>>>>>>>> 2. flush producer, making sure all changelog records
> > >>>>> have
> > >>>>>>> now
> > >>>>>>>>>>> acked.
> > >>>>>>>>>>>> //
> > >>>>>>>>>>>>>>> here we would get the new changelog position, say 200
> > >>>>>>>>>>>>>>> 3. flush store, make sure all writes are persisted.
> > >>>>>>>>>>>>>>> 4. producer.sendOffsetsToTransaction();
> > >>>>>>>>>>> producer.commitTransaction();
> > >>>>>>>>>>>>> //
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> would make the writes in changelog up to offset 200
> > >>>>>>> committed
> > >>>>>>>>>>>>>>> 5. Update the checkpoint file to 200.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The question about atomicity between those lines, for
> > >>>>>>> example:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> * If we crash between line 4 and line 5, the local
> > >>>>>>> checkpoint
> > >>>>>>>>>> file
> > >>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>> stay as 100, and upon recovery we would replay the
> > >>>>>> changelog
> > >>>>>>>>> from
> > >>>>>>>>>>> 100
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> 200. This is not ideal but does not violate EOS, since
> > >>>>> the
> > >>>>>>>>>>> changelogs
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> all overwrites anyways.
> > >>>>>>>>>>>>>>> * If we crash between line 3 and 4, then at that time
> > >>>>> the
> > >>>>>>>>> local
> > >>>>>>>>>>>>>> persistent
> > >>>>>>>>>>>>>>> store image is representing as of offset 200, but upon
> > >>>>>>>>> recovery
> > >>>>>>>>>> all
> > >>>>>>>>>>>>>>> changelog records from 100 to log-end-offset would be
> > >>>>>>>>> considered
> > >>>>>>>>>> as
> > >>>>>>>>>>>>>> aborted
> > >>>>>>>>>>>>>>> and not be replayed and we would restart processing
> > >>>>> from
> > >>>>>>>>> position
> > >>>>>>>>>>>> 100.
> > >>>>>>>>>>>>>>> Restart processing will violate EOS.I'm not sure how
> > >>>>> e.g.
> > >>>>>>>>>> RocksDB's
> > >>>>>>>>>>>>>>> WriteBatchWithIndex would make sure that the step 4 and
> > >>>>>>> step 5
> > >>>>>>>>>>> could
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>> done atomically here.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Originally what I was thinking when creating the JIRA
> > >>>>>> ticket
> > >>>>>>>>> is
> > >>>>>>>>>>> that
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> need to let the state store to provide a transactional
> > >>>>> API
> > >>>>>>>>> like
> > >>>>>>>>>>>> "token
> > >>>>>>>>>>>>>>> commit()" used in step 4) above which returns a token,
> > >>>>>> that
> > >>>>>>>>> e.g.
> > >>>>>>>>>> in
> > >>>>>>>>>>>> our
> > >>>>>>>>>>>>>>> example above indicates offset 200, and that token
> > >>>>> would
> > >>>>>> be
> > >>>>>>>>>> written
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>> part
> > >>>>>>>>>>>>>>> of the records in Kafka transaction in step 5). And
> > >>>>> upon
> > >>>>>>>>> recovery
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>> store would have another API like "rollback(token)"
> > >>>>> where
> > >>>>>>> the
> > >>>>>>>>>> token
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> read
> > >>>>>>>>>>>>>>> from the latest committed txn, and be used to rollback
> > >>>>> the
> > >>>>>>>>> store
> > >>>>>>>>>> to
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> committed image. I think your proposal is different,
> > >>>>> and
> > >>>>>> it
> > >>>>>>>>> seems
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>>>> you're proposing we swap step 3) and 4) above, but the
> > >>>>>>>>> atomicity
> > >>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>> still remains since now you may have the store image at
> > >>>>>> 100
> > >>>>>>>>> but
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> changelog is committed at 200. I'd like to learn more
> > >>>>>> about
> > >>>>>>>>> the
> > >>>>>>>>>>>> details
> > >>>>>>>>>>>>>>> on how it resolves such issues.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Anyways, that's just an example to make the point that
> > >>>>>> there
> > >>>>>>>>> are
> > >>>>>>>>>>> lots
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>> implementational details which would drive the public
> > >>>>> API
> > >>>>>>>>> design,
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> should probably first do a POC, and come back to
> > >>>>> discuss
> > >>>>>> the
> > >>>>>>>>> KIP.
> > >>>>>>>>>>> Let
> > >>>>>>>>>>>>> me
> > >>>>>>>>>>>>>>> know what you think?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 AM Sagar <
> > >>>>>>>>>> sagarmeansoc...@gmail.com>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Alexander,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the KIP! This seems like a great proposal.
> > >>>>> I
> > >>>>>>> have
> > >>>>>>>>> the
> > >>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>> opinion as John on the Configuration part though. I
> > >>>>> think
> > >>>>>>>>> the 2
> > >>>>>>>>>>>> level
> > >>>>>>>>>>>>>>>> config and its behaviour based on the
> > >>>>> setting/unsetting
> > >>>>>> of
> > >>>>>>>>> the
> > >>>>>>>>>>> flag
> > >>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>> confusing to me as well. Since the KIP seems
> > >>>>> specifically
> > >>>>>>>>>> centred
> > >>>>>>>>>>>>> around
> > >>>>>>>>>>>>>>>> RocksDB it might be better to add it at the Supplier
> > >>>>>> level
> > >>>>>>> as
> > >>>>>>>>>> John
> > >>>>>>>>>>>>>>>> suggested.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On similar lines, this config name =>
> > >>>>>>>>>>>>>> *statestore.transactional.mechanism
> > >>>>>>>>>>>>>>>> *may
> > >>>>>>>>>>>>>>>> also need rethinking as the value assigned to
> > >>>>>>>>>>> it(rocksdb_indexbatch)
> > >>>>>>>>>>>>>>>> implicitly seems to assume that rocksdb is the only
> > >>>>>>>>> statestore
> > >>>>>>>>>>> that
> > >>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>> Stream supports while that's not the case.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Also, regarding the potential memory pressure that
> > >>>>> can be
> > >>>>>>>>>>> introduced
> > >>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>> WriteBatchIndex, do you think it might make more
> > >>>>> sense to
> > >>>>>>>>>> include
> > >>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>> numbers/benchmarks on how much the memory consumption
> > >>>>>> might
> > >>>>>>>>>>>> increase?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Lastly, the read_uncommitted flag's behaviour on IQ
> > >>>>> may
> > >>>>>>> need
> > >>>>>>>>>> more
> > >>>>>>>>>>>>>>>> elaboration.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> These points aside, as I said, this is a great
> > >>>>> proposal!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks!
> > >>>>>>>>>>>>>>>> Sagar.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 PM John Roesler <
> > >>>>>>>>>>> vvcep...@apache.org>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks for the KIP, Alex!
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I'm really happy to see your proposal. This
> > >>>>> improvement
> > >>>>>>>>> fills a
> > >>>>>>>>>>>>>>>>> long-standing gap.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I have a few questions:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 1. Configuration
> > >>>>>>>>>>>>>>>>> The KIP only mentions RocksDB, but of course, Streams
> > >>>>>> also
> > >>>>>>>>>> ships
> > >>>>>>>>>>>> with
> > >>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>> InMemory store, and users also plug in their own
> > >>>>> custom
> > >>>>>>>>> state
> > >>>>>>>>>>>> stores.
> > >>>>>>>>>>>>>> It
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> also common to use multiple types of state stores in
> > >>>>> the
> > >>>>>>>>> same
> > >>>>>>>>>>>>>> application
> > >>>>>>>>>>>>>>>>> for different purposes.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Against this backdrop, the choice to configure
> > >>>>>>>>> transactionality
> > >>>>>>>>>>> as
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>> top-level config, as well as to configure the store
> > >>>>>>>>> transaction
> > >>>>>>>>>>>>>> mechanism
> > >>>>>>>>>>>>>>>>> as a top-level config, seems a bit off.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Did you consider instead just adding the option to
> > >>>>> the
> > >>>>>>>>>>>>>>>>> RocksDB*StoreSupplier classes and the factories in
> > >>>>>> Stores
> > >>>>>>> ?
> > >>>>>>>>> It
> > >>>>>>>>>>>> seems
> > >>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>> the desire to enable the feature by default, but
> > >>>>> with a
> > >>>>>>>>>>>> feature-flag
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> disable it was a factor here. However, as you pointed
> > >>>>>> out,
> > >>>>>>>>>> there
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>> major considerations that users should be aware of,
> > >>>>> so
> > >>>>>>>>> opt-in
> > >>>>>>>>>>>> doesn't
> > >>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>>>>>>> like a bad choice, either. You could add an Enum
> > >>>>>> argument
> > >>>>>>> to
> > >>>>>>>>>>> those
> > >>>>>>>>>>>>>>>>> factories like `RocksDBTransactionalMechanism.{NONE,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Some points in favor of this approach:
> > >>>>>>>>>>>>>>>>> * Avoid "stores that don't support transactions
> > >>>>> ignore
> > >>>>>> the
> > >>>>>>>>>>> config"
> > >>>>>>>>>>>>>>>>> complexity
> > >>>>>>>>>>>>>>>>> * Users can choose how to spend their memory budget,
> > >>>>>>> making
> > >>>>>>>>>> some
> > >>>>>>>>>>>>> stores
> > >>>>>>>>>>>>>>>>> transactional and others not
> > >>>>>>>>>>>>>>>>> * When we add transactional support to in-memory
> > >>>>> stores,
> > >>>>>>> we
> > >>>>>>>>>> don't
> > >>>>>>>>>>>>> have
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> figure out what to do with the mechanism config
> > >>>>> (i.e.,
> > >>>>>>> what
> > >>>>>>>>> do
> > >>>>>>>>>>> you
> > >>>>>>>>>>>>> set
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> mechanism to when there are multiple kinds of
> > >>>>>>> transactional
> > >>>>>>>>>>> stores
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> topology?)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 2. caching/flushing/transactions
> > >>>>>>>>>>>>>>>>> The coupling between memory usage and flushing that
> > >>>>> you
> > >>>>>>>>>> mentioned
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>>> troubling. It also occurs to me that there seems to
> > >>>>> be
> > >>>>>>> some
> > >>>>>>>>>>>>>> relationship
> > >>>>>>>>>>>>>>>>> with the existing record cache, which is also an
> > >>>>>> in-memory
> > >>>>>>>>>>> holding
> > >>>>>>>>>>>>> area
> > >>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> records that are not yet written to the cache and/or
> > >>>>>> store
> > >>>>>>>>>>> (albeit
> > >>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>> no
> > >>>>>>>>>>>>>>>>> particular semantics). Have you considered how all
> > >>>>> these
> > >>>>>>>>>>> components
> > >>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>> relate? For example, should a "full" WriteBatch
> > >>>>> actually
> > >>>>>>>>>> trigger
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>> flush
> > >>>>>>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>>> that we don't get OOMEs? If the proposed
> > >>>>> transactional
> > >>>>>>>>>> mechanism
> > >>>>>>>>>>>>> forces
> > >>>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>> uncommitted writes to be buffered in memory, until a
> > >>>>>>> commit,
> > >>>>>>>>>> then
> > >>>>>>>>>>>>> what
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> the advantage over just doing the same thing with the
> > >>>>>>>>>> RecordCache
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>> introducing the WriteBatch at all?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 3. ALOS
> > >>>>>>>>>>>>>>>>> You mentioned that a transactional store can help
> > >>>>> reduce
> > >>>>>>>>>>>> duplication
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> the case of ALOS. We might want to be careful about
> > >>>>>> claims
> > >>>>>>>>> like
> > >>>>>>>>>>>> that.
> > >>>>>>>>>>>>>>>>> Duplication isn't the way that repeated processing
> > >>>>>>>>> manifests in
> > >>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>> stores. Rather, it is in the form of dirty reads
> > >>>>> during
> > >>>>>>>>>>>> reprocessing.
> > >>>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>> feature may reduce the incidence of dirty reads
> > >>>>> during
> > >>>>>>>>>>>> reprocessing,
> > >>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>> not in a predictable way. During regular processing
> > >>>>>> today,
> > >>>>>>>>> we
> > >>>>>>>>>>> will
> > >>>>>>>>>>>>> send
> > >>>>>>>>>>>>>>>>> some records through to the changelog in between
> > >>>>> commit
> > >>>>>>>>>>> intervals.
> > >>>>>>>>>>>>>> Under
> > >>>>>>>>>>>>>>>>> ALOS, if any of those dirty writes gets committed to
> > >>>>> the
> > >>>>>>>>>>> changelog
> > >>>>>>>>>>>>>> topic,
> > >>>>>>>>>>>>>>>>> then upon failure, we have to roll the store forward
> > >>>>> to
> > >>>>>>> them
> > >>>>>>>>>>>> anyway,
> > >>>>>>>>>>>>>>>>> regardless of this new transactional mechanism.
> > >>>>> That's a
> > >>>>>>>>>> fixable
> > >>>>>>>>>>>>>> problem,
> > >>>>>>>>>>>>>>>>> by the way, but this KIP doesn't seem to fix it. I
> > >>>>>> wonder
> > >>>>>>>>> if we
> > >>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>> any claims about the relationship of this feature to
> > >>>>>> ALOS
> > >>>>>>> if
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>> real-world
> > >>>>>>>>>>>>>>>>> behavior is so complex.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 4. IQ
> > >>>>>>>>>>>>>>>>> As a reminder, we have a new IQv2 mechanism now.
> > >>>>> Should
> > >>>>>> we
> > >>>>>>>>>>> propose
> > >>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>> changes to IQv1 to support this transactional
> > >>>>> mechanism,
> > >>>>>>>>> versus
> > >>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>> proposing it for IQv2? Certainly, it seems strange
> > >>>>> only
> > >>>>>> to
> > >>>>>>>>>>> propose
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>> for IQv1 and not v2.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Regarding your proposal for IQv1, I'm unsure what the
> > >>>>>>>>> behavior
> > >>>>>>>>>>>> should
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>> for readCommitted, since the current behavior also
> > >>>>> reads
> > >>>>>>>>> out of
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> RecordCache. I guess if readCommitted==false, then we
> > >>>>>> will
> > >>>>>>>>>>> continue
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> read
> > >>>>>>>>>>>>>>>>> from the cache first, then the Batch, then the store;
> > >>>>>> and
> > >>>>>>> if
> > >>>>>>>>>>>>>>>>> readCommitted==true, we would skip the cache and the
> > >>>>>> Batch
> > >>>>>>>>> and
> > >>>>>>>>>>> only
> > >>>>>>>>>>>>>> read
> > >>>>>>>>>>>>>>>>> from the persistent RocksDB store?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> What should IQ do if I request to readCommitted on a
> > >>>>>>>>>>>>> non-transactional
> > >>>>>>>>>>>>>>>>> store?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks again for proposing the KIP, and my apologies
> > >>>>> for
> > >>>>>>> the
> > >>>>>>>>>> long
> > >>>>>>>>>>>>>> reply;
> > >>>>>>>>>>>>>>>>> I'm hoping to air all my concerns in one "batch" to
> > >>>>> save
> > >>>>>>>>> time
> > >>>>>>>>>> for
> > >>>>>>>>>>>>> you.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>> -John
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov
> > >>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I've written a KIP for making Kafka Streams state
> > >>>>>> stores
> > >>>>>>>>>>>>> transactional
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> would like to start a discussion:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>> Alex
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> --
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> [image: Confluent] <https://www.confluent.io>
> > >>>>>>>>>>>>> Suhas Satish
> > >>>>>>>>>>>>> Engineering Manager
> > >>>>>>>>>>>>> Follow us: [image: Blog]
> > >>>>>>>>>>>>> <
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>
> >
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> > >>>>>>>>>>>>>> [image:
> > >>>>>>>>>>>>> Twitter] <https://twitter.com/ConfluentInc>[image:
> > >>>>> LinkedIn]
> > >>>>>>>>>>>>> <https://www.linkedin.com/company/confluent/>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> [image: Try Confluent Cloud for Free]
> > >>>>>>>>>>>>> <
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>
> >
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> -- Guozhang
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>


-- 
-- Guozhang

Reply via email to