Hi Lucas,

Thanks for looking over my KIP.

A) The bound is per-instance, not per-Task. This was a typo in the KIP that
I've now corrected. It was originally per-Task, but I changed it to
per-instance for exactly the reason you highlighted.
B) It's worth noting that transactionality is only enabled under EOS, and
in the default mode of operation (ALOS), there should be no change in
behavior at all. I think, under EOS, we can mitigate the impact on users by
sufficiently low default values for the memory bound configuration. I
understand your hesitation to include a significant change of behaviour,
especially in a minor release, but I suspect that most users will prefer
the memory impact (under EOS) to the existing behaviour of frequent state
restorations! If this is a problem, the changes can wait until the next
major release. I'll be running a patched version of streams in production
with these changes as soon as they're ready, so it won't disrupt me :-D
C) The main purpose of this sentence was just to note that some changes
will need to be made to the way Segments are handled in order to ensure
they also benefit from transactions. At the time I wrote it, I hadn't
figured out the specific changes necessary, so it was deliberately vague.
This is the one outstanding problem I'm currently working on, and I'll
update this section with more detail once I have figured out the exact
changes required.
D) newTransaction() provides the necessary isolation guarantees. While the
RocksDB implementation of transactions doesn't technically *need* read-only
users to call newTransaction(), other implementations (e.g. a hypothetical
PostgresStore) may require it. Calling newTransaction() when no transaction
is necessary is essentially free, as it will just return this.

I didn't do any profiling of the KIP-844 PoC, but I think it should be
fairly obvious where the performance problems stem from: writes under
KIP-844 require 3 extra memory-copies: 1 to encode it with the
tombstone/record flag, 1 to decode it from the tombstone/record flag, and 1
to copy the record from the "temporary" store to the "main" store, when the
transaction commits. The different approach taken by KIP-869 should perform
much better, as it avoids all these copies, and may actually perform
slightly better than trunk, due to batched writes in RocksDB performing
better than non-batched writes.[1]

Regards,
Nick

1: https://github.com/adamretter/rocksjava-write-methods-benchmark#results

On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <lbruts...@confluent.io.invalid>
wrote:

> Hi Nick,
>
> I'm just starting to read up on the whole discussion about KIP-892 and
> KIP-844. Thanks a lot for your work on this, I do think
> `WriteBatchWithIndex` may be the way to go here. I do have some
> questions about the latest draft.
>
>  A) If I understand correctly, you propose to put a bound on the
> (native) memory consumed by each task. However, I wonder if this is
> sufficient if we have temporary imbalances in the cluster. For
> example, depending on the timing of rebalances during a cluster
> restart, it could happen that a single streams node is assigned a lot
> more tasks than expected. With your proposed change, this would mean
> that the memory required by this one node could be a multiple of what
> is required during normal operation. I wonder if it wouldn't be safer
> to put a global bound on the memory use, across all tasks.
>  B) Generally, the memory concerns still give me the feeling that this
> should not be enabled by default for all users in a minor release.
>  C) In section "Transaction Management": the sentence "A similar
> analogue will be created to automatically manage `Segment`
> transactions.". Maybe this is just me lacking some background, but I
> do not understand this, it would be great if you could clarify what
> you mean here.
>  D) Could you please clarify why IQ has to call newTransaction(), when
> it's read-only.
>
> And one last thing not strictly related to your KIP: if there is an
> easy way for you to find out why the KIP-844 PoC is 20x slower (e.g.
> by providing a flame graph), that would be quite interesting.
>
> Cheers,
> Lucas
>
> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <nick.telf...@gmail.com>
> wrote:
> >
> > Hi everyone,
> >
> > I've updated the KIP with a more detailed design, which reflects the
> > implementation I've been working on:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >
> > This new design should address the outstanding points already made in the
> > thread.
> >
> > Please let me know if there are areas that are unclear or need more
> > clarification.
> >
> > I have a (nearly) working implementation. I'm confident that the
> remaining
> > work (making Segments behave) will not impact the documented design.
> >
> > Regards,
> >
> > Nick
> >
> > On Tue, 6 Dec 2022 at 19:24, Colt McNealy <c...@littlehorse.io> wrote:
> >
> > > Nick,
> > >
> > > Thank you for the reply; that makes sense. I was hoping that, since
> reading
> > > uncommitted records from IQ in EOS isn't part of the documented API,
> maybe
> > > you *wouldn't* have to wait for the next major release to make that
> change;
> > > but given that it would be considered a major change, I like your
> approach
> > > the best.
> > >
> > > Wishing you a speedy recovery and happy coding!
> > >
> > > Thanks,
> > > Colt McNealy
> > > *Founder, LittleHorse.io*
> > >
> > >
> > > On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > >
> > > > Hi Colt,
> > > >
> > > > 10: Yes, I agree it's not ideal. I originally intended to try to
> keep the
> > > > behaviour unchanged as much as possible, otherwise we'd have to wait
> for
> > > a
> > > > major version release to land these changes.
> > > > 20: Good point, ALOS doesn't need the same level of guarantee, and
> the
> > > > typically longer commit intervals would be problematic when reading
> only
> > > > "committed" records.
> > > >
> > > > I've been away for 5 days recovering from minor surgery, but I spent
> a
> > > > considerable amount of that time working through ideas for possible
> > > > solutions in my head. I think your suggestion of keeping ALOS as-is,
> but
> > > > buffering writes for EOS is the right path forwards, although I have
> a
> > > > solution that both expands on this, and provides for some more formal
> > > > guarantees.
> > > >
> > > > Essentially, adding support to KeyValueStores for "Transactions",
> with
> > > > clearly defined IsolationLevels. Using "Read Committed" when under
> EOS,
> > > and
> > > > "Read Uncommitted" under ALOS.
> > > >
> > > > The nice thing about this approach is that it gives us much more
> clearly
> > > > defined isolation behaviour that can be properly documented to ensure
> > > users
> > > > know what to expect.
> > > >
> > > > I'm still working out the kinks in the design, and will update the
> KIP
> > > when
> > > > I have something. The main struggle is trying to implement this
> without
> > > > making any major changes to the existing interfaces or breaking
> existing
> > > > implementations, because currently everything expects to operate
> directly
> > > > on a StateStore, and not a Transaction of that store. I think I'm
> getting
> > > > close, although sadly I won't be able to progress much until next
> week
> > > due
> > > > to some work commitments.
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > > > On Thu, 1 Dec 2022 at 00:01, Colt McNealy <c...@littlehorse.io>
> wrote:
> > > >
> > > > > Nick,
> > > > >
> > > > > Thank you for the explanation, and also for the updated KIP. I am
> quite
> > > > > eager for this improvement to be released as it would greatly
> reduce
> > > the
> > > > > operational difficulties of EOS streams apps.
> > > > >
> > > > > Two questions:
> > > > >
> > > > > 10)
> > > > > >When reading records, we will use the
> > > > > WriteBatchWithIndex#getFromBatchAndDB
> > > > >  and WriteBatchWithIndex#newIteratorWithBase utilities in order to
> > > ensure
> > > > > that uncommitted writes are available to query.
> > > > > Why do extra work to enable the reading of uncommitted writes
> during
> > > IQ?
> > > > > Code complexity aside, reading uncommitted writes is, in my
> opinion, a
> > > > > minor flaw in EOS IQ; it would be very nice to have the guarantee
> that,
> > > > > with EOS, IQ only reads committed records. In order to avoid dirty
> > > reads,
> > > > > one currently must query a standby replica (but this still doesn't
> > > fully
> > > > > guarantee monotonic reads).
> > > > >
> > > > > 20) Is it also necessary to enable this optimization on ALOS
> stores?
> > > The
> > > > > motivation of KIP-844 was mainly to reduce the need to restore
> state
> > > from
> > > > > scratch on unclean EOS shutdowns; with ALOS it was acceptable to
> accept
> > > > > that there may have been uncommitted writes on disk. On a side
> note, if
> > > > you
> > > > > enable this type of store on ALOS processors, the community would
> > > > > definitely want to enable queries on dirty reads; otherwise users
> would
> > > > > have to wait 30 seconds (default) to see an update.
> > > > >
> > > > > Thank you for doing this fantastic work!
> > > > > Colt McNealy
> > > > > *Founder, LittleHorse.io*
> > > > >
> > > > >
> > > > > On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <
> nick.telf...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I've drastically reduced the scope of this KIP to no longer
> include
> > > the
> > > > > > StateStore management of checkpointing. This can be added as a
> KIP
> > > > later
> > > > > on
> > > > > > to further optimize the consistency and performance of state
> stores.
> > > > > >
> > > > > > I've also added a section discussing some of the concerns around
> > > > > > concurrency, especially in the presence of Iterators. I'm
> thinking of
> > > > > > wrapping WriteBatchWithIndex with a reference-counting
> copy-on-write
> > > > > > implementation (that only makes a copy if there's an active
> > > iterator),
> > > > > but
> > > > > > I'm open to suggestions.
> > > > > >
> > > > > > Regards,
> > > > > > Nick
> > > > > >
> > > > > > On Mon, 28 Nov 2022 at 16:36, Nick Telford <
> nick.telf...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Colt,
> > > > > > >
> > > > > > > I didn't do any profiling, but the 844 implementation:
> > > > > > >
> > > > > > >    - Writes uncommitted records to a temporary RocksDB instance
> > > > > > >       - Since tombstones need to be flagged, all record values
> are
> > > > > > >       prefixed with a value/tombstone marker. This
> necessitates a
> > > > > memory
> > > > > > copy.
> > > > > > >    - On-commit, iterates all records in this temporary
> instance and
> > > > > > >    writes them to the main RocksDB store.
> > > > > > >    - While iterating, the value/tombstone marker needs to be
> parsed
> > > > and
> > > > > > >    the real value extracted. This necessitates another memory
> copy.
> > > > > > >
> > > > > > > My guess is that the cost of iterating the temporary RocksDB
> store
> > > is
> > > > > the
> > > > > > > major factor, with the 2 extra memory copies per-Record
> > > contributing
> > > > a
> > > > > > > significant amount too.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Nick
> > > > > > >
> > > > > > > On Mon, 28 Nov 2022 at 16:12, Colt McNealy <
> c...@littlehorse.io>
> > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> Out of curiosity, why does the performance of the store
> degrade so
> > > > > > >> significantly with the 844 implementation? I wouldn't be too
> > > > surprised
> > > > > > by
> > > > > > >> a
> > > > > > >> 50-60% drop (caused by each record being written twice), but
> 96%
> > > is
> > > > > > >> extreme.
> > > > > > >>
> > > > > > >> The only thing I can think of which could create such a
> bottleneck
> > > > > would
> > > > > > >> be
> > > > > > >> that perhaps the 844 implementation deserializes and then
> > > > > re-serializes
> > > > > > >> the
> > > > > > >> store values when copying from the uncommitted to committed
> store,
> > > > > but I
> > > > > > >> wasn't able to figure that out when I scanned the PR.
> > > > > > >>
> > > > > > >> Colt McNealy
> > > > > > >> *Founder, LittleHorse.io*
> > > > > > >>
> > > > > > >>
> > > > > > >> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <
> > > > nick.telf...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Hi everyone,
> > > > > > >> >
> > > > > > >> > I've updated the KIP to resolve all the points that have
> been
> > > > raised
> > > > > > so
> > > > > > >> > far, with one exception: the ALOS default commit interval
> of 5
> > > > > minutes
> > > > > > >> is
> > > > > > >> > likely to cause WriteBatchWithIndex memory to grow too
> large.
> > > > > > >> >
> > > > > > >> > There's a couple of different things I can think of to solve
> > > this:
> > > > > > >> >
> > > > > > >> >    - We already have a memory/record limit in the KIP to
> prevent
> > > > OOM
> > > > > > >> >    errors. Should we choose a default value for these? My
> > > concern
> > > > > here
> > > > > > >> is
> > > > > > >> > that
> > > > > > >> >    anything we choose might seem rather arbitrary. We could
> > > change
> > > > > > >> >    its behaviour such that under ALOS, it only triggers the
> > > commit
> > > > > of
> > > > > > >> the
> > > > > > >> >    StateStore, but under EOS, it triggers a commit of the
> Kafka
> > > > > > >> > transaction.
> > > > > > >> >    - We could introduce a separate `checkpoint.interval.ms`
> to
> > > > > allow
> > > > > > >> ALOS
> > > > > > >> >    to commit the StateStores more frequently than the
> general
> > > > > > >> >    commit.interval.ms? My concern here is that the
> semantics of
> > > > > this
> > > > > > >> > config
> > > > > > >> >    would depend on the processing.mode; under ALOS it would
> > > allow
> > > > > more
> > > > > > >> >    frequently committing stores, whereas under EOS it
> couldn't.
> > > > > > >> >
> > > > > > >> > Any better ideas?
> > > > > > >> >
> > > > > > >> > On Wed, 23 Nov 2022 at 16:25, Nick Telford <
> > > > nick.telf...@gmail.com>
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Hi Alex,
> > > > > > >> > >
> > > > > > >> > > Thanks for the feedback.
> > > > > > >> > >
> > > > > > >> > > I've updated the discussion of OOM issues by describing
> how
> > > > we'll
> > > > > > >> handle
> > > > > > >> > > it. Here's the new text:
> > > > > > >> > >
> > > > > > >> > > To mitigate this, we will automatically force a Task
> commit if
> > > > the
> > > > > > >> total
> > > > > > >> > >> uncommitted records returned by
> > > > > > >> > >> StateStore#approximateNumUncommittedEntries()  exceeds a
> > > > > threshold,
> > > > > > >> > >> configured by max.uncommitted.state.entries.per.task; or
> the
> > > > > total
> > > > > > >> > >> memory used for buffering uncommitted records returned by
> > > > > > >> > >> StateStore#approximateNumUncommittedBytes() exceeds the
> > > > threshold
> > > > > > >> > >> configured by max.uncommitted.state.bytes.per.task. This
> will
> > > > > > roughly
> > > > > > >> > >> bound the memory required per-Task for buffering
> uncommitted
> > > > > > records,
> > > > > > >> > >> irrespective of the commit.interval.ms, and will
> effectively
> > > > > bound
> > > > > > >> the
> > > > > > >> > >> number of records that will need to be restored in the
> event
> > > > of a
> > > > > > >> > failure.
> > > > > > >> > >>
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > These limits will be checked in StreamTask#process and a
> > > > premature
> > > > > > >> commit
> > > > > > >> > >> will be requested via Task#requestCommit().
> > > > > > >> > >>
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > Note that these new methods provide default
> implementations
> > > that
> > > > > > >> ensure
> > > > > > >> > >> existing custom stores and non-transactional stores (e.g.
> > > > > > >> > >> InMemoryKeyValueStore) do not force any early commits.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > I've chosen to have the StateStore expose approximations
> of
> > > its
> > > > > > buffer
> > > > > > >> > > size/count instead of opaquely requesting a commit in
> order to
> > > > > > >> delegate
> > > > > > >> > the
> > > > > > >> > > decision making to the Task itself. This enables Tasks to
> look
> > > > at
> > > > > > >> *all*
> > > > > > >> > of
> > > > > > >> > > their StateStores, and determine whether an early commit
> is
> > > > > > necessary.
> > > > > > >> > > Notably, it enables pre-Task thresholds, instead of
> per-Store,
> > > > > which
> > > > > > >> > > prevents Tasks with many StateStores from using much more
> > > memory
> > > > > > than
> > > > > > >> > Tasks
> > > > > > >> > > with one StateStore. This makes sense, since commits are
> done
> > > > > > by-Task,
> > > > > > >> > not
> > > > > > >> > > by-Store.
> > > > > > >> > >
> > > > > > >> > > Prizes* for anyone who can come up with a better name for
> the
> > > > new
> > > > > > >> config
> > > > > > >> > > properties!
> > > > > > >> > >
> > > > > > >> > > Thanks for pointing out the potential performance issues
> of
> > > > WBWI.
> > > > > > From
> > > > > > >> > the
> > > > > > >> > > benchmarks that user posted[1], it looks like WBWI still
> > > > performs
> > > > > > >> > > considerably better than individual puts, which is the
> > > existing
> > > > > > >> design,
> > > > > > >> > so
> > > > > > >> > > I'd actually expect a performance boost from WBWI, just
> not as
> > > > > great
> > > > > > >> as
> > > > > > >> > > we'd get from a plain WriteBatch. This does suggest that a
> > > good
> > > > > > >> > > optimization would be to use a regular WriteBatch for
> > > > restoration
> > > > > > (in
> > > > > > >> > > RocksDBStore#restoreBatch), since we know that those
> records
> > > > will
> > > > > > >> never
> > > > > > >> > be
> > > > > > >> > > queried before they're committed.
> > > > > > >> > >
> > > > > > >> > > 1:
> > > > > > >> >
> > > > > >
> > > >
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> > > > > > >> > >
> > > > > > >> > > * Just kidding, no prizes, sadly.
> > > > > > >> > >
> > > > > > >> > > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
> > > > > > >> > > <asorokou...@confluent.io.invalid> wrote:
> > > > > > >> > >
> > > > > > >> > >> Hey Nick,
> > > > > > >> > >>
> > > > > > >> > >> Thank you for the KIP! With such a significant
> performance
> > > > > > >> degradation
> > > > > > >> > in
> > > > > > >> > >> the secondary store approach, we should definitely
> consider
> > > > > > >> > >> WriteBatchWithIndex. I also like encapsulating
> checkpointing
> > > > > inside
> > > > > > >> the
> > > > > > >> > >> default state store implementation to improve
> performance.
> > > > > > >> > >>
> > > > > > >> > >> +1 to John's comment to keep the current checkpointing
> as a
> > > > > > fallback
> > > > > > >> > >> mechanism. We want to keep existing users' workflows
> intact
> > > if
> > > > we
> > > > > > >> can. A
> > > > > > >> > >> non-intrusive way would be to add a separate StateStore
> > > method,
> > > > > > say,
> > > > > > >> > >> StateStore#managesCheckpointing(), that controls whether
> the
> > > > > state
> > > > > > >> store
> > > > > > >> > >> implementation owns checkpointing.
> > > > > > >> > >>
> > > > > > >> > >> I think that a solution to the transactional writes
> should
> > > > > address
> > > > > > >> the
> > > > > > >> > >> OOMEs. One possible way to address that is to wire
> > > StateStore's
> > > > > > >> commit
> > > > > > >> > >> request by adding, say, StateStore#commitNeeded that is
> > > checked
> > > > > in
> > > > > > >> > >> StreamTask#commitNeeded via the corresponding
> > > > > > ProcessorStateManager.
> > > > > > >> > With
> > > > > > >> > >> that change, RocksDBStore will have to track the current
> > > > > > transaction
> > > > > > >> > size
> > > > > > >> > >> and request a commit when the size goes over a
> (configurable)
> > > > > > >> threshold.
> > > > > > >> > >>
> > > > > > >> > >> AFAIU WriteBatchWithIndex might perform significantly
> slower
> > > > than
> > > > > > >> > non-txn
> > > > > > >> > >> puts as the batch size grows [1]. We should have a
> > > > configuration
> > > > > to
> > > > > > >> fall
> > > > > > >> > >> back to the current behavior (and/or disable txn stores
> for
> > > > ALOS)
> > > > > > >> unless
> > > > > > >> > >> the benchmarks show negligible overhead for longer
> commits /
> > > > > > >> > large-enough
> > > > > > >> > >> batch sizes.
> > > > > > >> > >>
> > > > > > >> > >> If you prefer to keep the KIP smaller, I would rather
> cut out
> > > > > > >> > >> state-store-managed checkpointing rather than proper OOMe
> > > > > handling
> > > > > > >> and
> > > > > > >> > >> being able to switch to non-txn behavior. The
> checkpointing
> > > is
> > > > > not
> > > > > > >> > >> necessary to solve the recovery-under-EOS problem. On the
> > > other
> > > > > > hand,
> > > > > > >> > once
> > > > > > >> > >> WriteBatchWithIndex is in, it will be much easier to add
> > > > > > >> > >> state-store-managed checkpointing.
> > > > > > >> > >>
> > > > > > >> > >> If you share the current implementation, I am happy to
> help
> > > you
> > > > > > >> address
> > > > > > >> > >> the
> > > > > > >> > >> OOMe and configuration parts as well as review and test
> the
> > > > > patch.
> > > > > > >> > >>
> > > > > > >> > >> Best,
> > > > > > >> > >> Alex
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> > >> 1. https://github.com/facebook/rocksdb/issues/608
> > > > > > >> > >>
> > > > > > >> > >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <
> > > > > > nick.telf...@gmail.com
> > > > > > >> >
> > > > > > >> > >> wrote:
> > > > > > >> > >>
> > > > > > >> > >> > Hi John,
> > > > > > >> > >> >
> > > > > > >> > >> > Thanks for the review and feedback!
> > > > > > >> > >> >
> > > > > > >> > >> > 1. Custom Stores: I've been mulling over this problem
> > > myself.
> > > > > As
> > > > > > it
> > > > > > >> > >> stands,
> > > > > > >> > >> > custom stores would essentially lose checkpointing
> with no
> > > > > > >> indication
> > > > > > >> > >> that
> > > > > > >> > >> > they're expected to make changes, besides a line in the
> > > > release
> > > > > > >> > notes. I
> > > > > > >> > >> > agree that the best solution would be to provide a
> default
> > > > that
> > > > > > >> > >> checkpoints
> > > > > > >> > >> > to a file. The one thing I would change is that the
> > > > > checkpointing
> > > > > > >> is
> > > > > > >> > to
> > > > > > >> > >> a
> > > > > > >> > >> > store-local file, instead of a per-Task file. This way
> the
> > > > > > >> StateStore
> > > > > > >> > >> still
> > > > > > >> > >> > technically owns its own checkpointing (via a default
> > > > > > >> implementation),
> > > > > > >> > >> and
> > > > > > >> > >> > the StateManager/Task execution engine doesn't need to
> know
> > > > > > >> anything
> > > > > > >> > >> about
> > > > > > >> > >> > checkpointing, which greatly simplifies some of the
> logic.
> > > > > > >> > >> >
> > > > > > >> > >> > 2. OOME errors: The main reasons why I didn't explore a
> > > > > solution
> > > > > > to
> > > > > > >> > >> this is
> > > > > > >> > >> > a) to keep this KIP as simple as possible, and b)
> because
> > > I'm
> > > > > not
> > > > > > >> > >> exactly
> > > > > > >> > >> > how to signal that a Task should commit prematurely.
> I'm
> > > > > > confident
> > > > > > >> > it's
> > > > > > >> > >> > possible, and I think it's worth adding a section on
> > > handling
> > > > > > this.
> > > > > > >> > >> Besides
> > > > > > >> > >> > my proposal to force an early commit once memory usage
> > > > reaches
> > > > > a
> > > > > > >> > >> threshold,
> > > > > > >> > >> > is there any other approach that you might suggest for
> > > > tackling
> > > > > > >> this
> > > > > > >> > >> > problem?
> > > > > > >> > >> >
> > > > > > >> > >> > 3. ALOS: I can add in an explicit paragraph, but my
> > > > assumption
> > > > > is
> > > > > > >> that
> > > > > > >> > >> > since transactional behaviour comes at little/no cost,
> that
> > > > it
> > > > > > >> should
> > > > > > >> > be
> > > > > > >> > >> > available by default on all stores, irrespective of the
> > > > > > processing
> > > > > > >> > mode.
> > > > > > >> > >> > While ALOS doesn't use transactions, the Task itself
> still
> > > > > > >> "commits",
> > > > > > >> > so
> > > > > > >> > >> > the behaviour should be correct under ALOS too. I'm not
> > > > > convinced
> > > > > > >> that
> > > > > > >> > >> it's
> > > > > > >> > >> > worth having both transactional/non-transactional
> stores
> > > > > > >> available, as
> > > > > > >> > >> it
> > > > > > >> > >> > would considerably increase the complexity of the
> codebase,
> > > > for
> > > > > > >> very
> > > > > > >> > >> little
> > > > > > >> > >> > benefit.
> > > > > > >> > >> >
> > > > > > >> > >> > 4. Method deprecation: Are you referring to
> > > > > > >> StateStore#getPosition()?
> > > > > > >> > >> As I
> > > > > > >> > >> > understand it, Position contains the position of the
> > > *source*
> > > > > > >> topics,
> > > > > > >> > >> > whereas the commit offsets would be the *changelog*
> > > offsets.
> > > > So
> > > > > > >> it's
> > > > > > >> > >> still
> > > > > > >> > >> > necessary to retain the Position data, as well as the
> > > > changelog
> > > > > > >> > offsets.
> > > > > > >> > >> > What I meant in the KIP is that Position offsets are
> > > > currently
> > > > > > >> stored
> > > > > > >> > >> in a
> > > > > > >> > >> > file, and since we can atomically store metadata along
> with
> > > > the
> > > > > > >> record
> > > > > > >> > >> > batch we commit to RocksDB, we can move our Position
> > > offsets
> > > > in
> > > > > > to
> > > > > > >> > this
> > > > > > >> > >> > metadata too, and gain the same transactional
> guarantees
> > > that
> > > > > we
> > > > > > >> will
> > > > > > >> > >> for
> > > > > > >> > >> > changelog offsets, ensuring that the Position offsets
> are
> > > > > > >> consistent
> > > > > > >> > >> with
> > > > > > >> > >> > the records that are read from the database.
> > > > > > >> > >> >
> > > > > > >> > >> > Regards,
> > > > > > >> > >> > Nick
> > > > > > >> > >> >
> > > > > > >> > >> > On Tue, 22 Nov 2022 at 16:25, John Roesler <
> > > > > vvcep...@apache.org>
> > > > > > >> > wrote:
> > > > > > >> > >> >
> > > > > > >> > >> > > Thanks for publishing this alternative, Nick!
> > > > > > >> > >> > >
> > > > > > >> > >> > > The benchmark you mentioned in the KIP-844 discussion
> > > seems
> > > > > > like
> > > > > > >> a
> > > > > > >> > >> > > compelling reason to revisit the built-in
> > > transactionality
> > > > > > >> > mechanism.
> > > > > > >> > >> I
> > > > > > >> > >> > > also appreciate you analysis, showing that for most
> use
> > > > > cases,
> > > > > > >> the
> > > > > > >> > >> write
> > > > > > >> > >> > > batch approach should be just fine.
> > > > > > >> > >> > >
> > > > > > >> > >> > > There are a couple of points that would hold me back
> from
> > > > > > >> approving
> > > > > > >> > >> this
> > > > > > >> > >> > > KIP right now:
> > > > > > >> > >> > >
> > > > > > >> > >> > > 1. Loss of coverage for custom stores.
> > > > > > >> > >> > > The fact that you can plug in a (relatively) simple
> > > > > > >> implementation
> > > > > > >> > of
> > > > > > >> > >> the
> > > > > > >> > >> > > XStateStore interfaces and automagically get a
> > > distributed
> > > > > > >> database
> > > > > > >> > >> out
> > > > > > >> > >> > of
> > > > > > >> > >> > > it is a significant benefit of Kafka Streams. I'd
> hate to
> > > > > lose
> > > > > > >> it,
> > > > > > >> > so
> > > > > > >> > >> it
> > > > > > >> > >> > > would be better to spend some time and come up with
> a way
> > > > to
> > > > > > >> > preserve
> > > > > > >> > >> > that
> > > > > > >> > >> > > property. For example, can we provide a default
> > > > > implementation
> > > > > > of
> > > > > > >> > >> > > `commit(..)` that re-implements the existing
> > > > checkpoint-file
> > > > > > >> > >> approach? Or
> > > > > > >> > >> > > perhaps add an `isTransactional()` flag to the state
> > > store
> > > > > > >> interface
> > > > > > >> > >> so
> > > > > > >> > >> > > that the runtime can decide whether to continue to
> manage
> > > > > > >> checkpoint
> > > > > > >> > >> > files
> > > > > > >> > >> > > vs delegating transactionality to the stores?
> > > > > > >> > >> > >
> > > > > > >> > >> > > 2. Guarding against OOME
> > > > > > >> > >> > > I appreciate your analysis, but I don't think it's
> > > > sufficient
> > > > > > to
> > > > > > >> say
> > > > > > >> > >> that
> > > > > > >> > >> > > we will solve the memory problem later if it becomes
> > > > > necessary.
> > > > > > >> The
> > > > > > >> > >> > > experience leading to that situation would be quite
> bad:
> > > > > > Imagine,
> > > > > > >> > you
> > > > > > >> > >> > > upgrade to AK 3.next, your tests pass, so you deploy
> to
> > > > > > >> production.
> > > > > > >> > >> That
> > > > > > >> > >> > > night, you get paged because your app is now crashing
> > > with
> > > > > > >> OOMEs. As
> > > > > > >> > >> with
> > > > > > >> > >> > > all OOMEs, you'll have a really hard time finding the
> > > root
> > > > > > cause,
> > > > > > >> > and
> > > > > > >> > >> > once
> > > > > > >> > >> > > you do, you won't have a clear path to resolve the
> issue.
> > > > You
> > > > > > >> could
> > > > > > >> > >> only
> > > > > > >> > >> > > tune down the commit interval and cache buffer size
> until
> > > > you
> > > > > > >> stop
> > > > > > >> > >> > getting
> > > > > > >> > >> > > crashes.
> > > > > > >> > >> > >
> > > > > > >> > >> > > FYI, I know of multiple cases where people run EOS
> with
> > > > much
> > > > > > >> larger
> > > > > > >> > >> > commit
> > > > > > >> > >> > > intervals to get better batching than the default,
> so I
> > > > don't
> > > > > > >> think
> > > > > > >> > >> this
> > > > > > >> > >> > > pathological case would be as rare as you suspect.
> > > > > > >> > >> > >
> > > > > > >> > >> > > Given that we already have the rudiments of an idea
> of
> > > what
> > > > > we
> > > > > > >> could
> > > > > > >> > >> do
> > > > > > >> > >> > to
> > > > > > >> > >> > > prevent this downside, we should take the time to
> design
> > > a
> > > > > > >> solution.
> > > > > > >> > >> We
> > > > > > >> > >> > owe
> > > > > > >> > >> > > it to our users to ensure that awesome new features
> don't
> > > > > come
> > > > > > >> with
> > > > > > >> > >> > bitter
> > > > > > >> > >> > > pills unless we can't avoid it.
> > > > > > >> > >> > >
> > > > > > >> > >> > > 3. ALOS mode.
> > > > > > >> > >> > > On the other hand, I didn't see an indication of how
> > > stores
> > > > > > will
> > > > > > >> be
> > > > > > >> > >> > > handled under ALOS (aka non-EOS) mode.
> Theoretically, the
> > > > > > >> > >> > transactionality
> > > > > > >> > >> > > of the store and the processing mode are orthogonal.
> A
> > > > > > >> transactional
> > > > > > >> > >> > store
> > > > > > >> > >> > > would serve ALOS just as well as a non-transactional
> one
> > > > (if
> > > > > > not
> > > > > > >> > >> better).
> > > > > > >> > >> > > Under ALOS, though, the default commit interval is
> five
> > > > > > minutes,
> > > > > > >> so
> > > > > > >> > >> the
> > > > > > >> > >> > > memory issue is far more pressing.
> > > > > > >> > >> > >
> > > > > > >> > >> > > As I see it, we have several options to resolve this
> > > point.
> > > > > We
> > > > > > >> could
> > > > > > >> > >> > > demonstrate that transactional stores work just fine
> for
> > > > ALOS
> > > > > > >> and we
> > > > > > >> > >> can
> > > > > > >> > >> > > therefore just swap over unconditionally. We could
> also
> > > > > disable
> > > > > > >> the
> > > > > > >> > >> > > transactional mechanism under ALOS so that stores
> operate
> > > > > just
> > > > > > >> the
> > > > > > >> > >> same
> > > > > > >> > >> > as
> > > > > > >> > >> > > they do today when run in ALOS mode. Finally, we
> could do
> > > > the
> > > > > > >> same
> > > > > > >> > as
> > > > > > >> > >> in
> > > > > > >> > >> > > KIP-844 and make transactional stores opt-in (it'd be
> > > > better
> > > > > to
> > > > > > >> > avoid
> > > > > > >> > >> the
> > > > > > >> > >> > > extra opt-in mechanism, but it's a good
> > > > get-out-of-jail-free
> > > > > > >> card).
> > > > > > >> > >> > >
> > > > > > >> > >> > > 4. (minor point) Deprecation of methods
> > > > > > >> > >> > >
> > > > > > >> > >> > > You mentioned that the new `commit` method replaces
> > > flush,
> > > > > > >> > >> > > updateChangelogOffsets, and checkpoint. It seems to
> me
> > > that
> > > > > the
> > > > > > >> > point
> > > > > > >> > >> > about
> > > > > > >> > >> > > atomicity and Position also suggests that it
> replaces the
> > > > > > >> Position
> > > > > > >> > >> > > callbacks. However, the proposal only deprecates
> `flush`.
> > > > > > Should
> > > > > > >> we
> > > > > > >> > be
> > > > > > >> > >> > > deprecating other methods as well?
> > > > > > >> > >> > >
> > > > > > >> > >> > > Thanks again for the KIP! It's really nice that you
> and
> > > > Alex
> > > > > > will
> > > > > > >> > get
> > > > > > >> > >> the
> > > > > > >> > >> > > chance to collaborate on both directions so that we
> can
> > > get
> > > > > the
> > > > > > >> best
> > > > > > >> > >> > > outcome for Streams and its users.
> > > > > > >> > >> > >
> > > > > > >> > >> > > -John
> > > > > > >> > >> > >
> > > > > > >> > >> > >
> > > > > > >> > >> > > On 2022/11/21 15:02:15 Nick Telford wrote:
> > > > > > >> > >> > > > Hi everyone,
> > > > > > >> > >> > > >
> > > > > > >> > >> > > > As I mentioned in the discussion thread for
> KIP-844,
> > > I've
> > > > > > been
> > > > > > >> > >> working
> > > > > > >> > >> > on
> > > > > > >> > >> > > > an alternative approach to achieving better
> > > transactional
> > > > > > >> > semantics
> > > > > > >> > >> for
> > > > > > >> > >> > > > Kafka Streams StateStores.
> > > > > > >> > >> > > >
> > > > > > >> > >> > > > I've published this separately as KIP-892:
> > > Transactional
> > > > > > >> Semantics
> > > > > > >> > >> for
> > > > > > >> > >> > > > StateStores
> > > > > > >> > >> > > > <
> > > > > > >> > >> > >
> > > > > > >> > >> >
> > > > > > >> > >>
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > > > > >> > >> > > >,
> > > > > > >> > >> > > > so that it can be discussed/reviewed separately
> from
> > > > > KIP-844.
> > > > > > >> > >> > > >
> > > > > > >> > >> > > > Alex: I'm especially interested in what you think!
> > > > > > >> > >> > > >
> > > > > > >> > >> > > > I have a nearly complete implementation of the
> changes
> > > > > > >> outlined in
> > > > > > >> > >> this
> > > > > > >> > >> > > > KIP, please let me know if you'd like me to push
> them
> > > for
> > > > > > >> review
> > > > > > >> > in
> > > > > > >> > >> > > advance
> > > > > > >> > >> > > > of a vote.
> > > > > > >> > >> > > >
> > > > > > >> > >> > > > Regards,
> > > > > > >> > >> > > >
> > > > > > >> > >> > > > Nick
> > > > > > >> > >> > > >
> > > > > > >> > >> > >
> > > > > > >> > >> >
> > > > > > >> > >>
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to