One more thing: I noted John's suggestion in the KIP, under "Rejected
Alternatives". I still think it's an idea worth pursuing, but I believe
that it's out of the scope of this KIP, because it solves a different set
of problems to this KIP, and the scope of this one has already grown quite
large!

On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi everyone,
>
> I've updated the KIP (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)
> with the latest changes; mostly bringing back "Atomic Checkpointing" (for
> what feels like the 10th time!). I think the one thing missing is some
> changes to metrics (notably the store "flush" metrics will need to be
> renamed to "commit").
>
> The reason I brought back Atomic Checkpointing was to decouple store flush
> from store commit. This is important, because with Transactional
> StateStores, we now need to call "flush" on *every* Task commit, and not
> just when the StateStore is closing, otherwise our transaction buffer will
> never be written and persisted, instead growing unbounded! I experimented
> with some simple solutions, like forcing a store flush whenever the
> transaction buffer was likely to exceed its configured size, but this was
> brittle: it prevented the transaction buffer from being configured to be
> unbounded, and it still would have required explicit flushes of RocksDB,
> yielding sub-optimal performance and memory utilization.
>
> I deemed Atomic Checkpointing to be the "right" way to resolve this
> problem. By ensuring that the changelog offsets that correspond to the most
> recently written records are always atomically written to the StateStore
> (by writing them to the same transaction buffer), we can avoid forcibly
> flushing the RocksDB memtables to disk, letting RocksDB flush them only
> when necessary, without losing any of our consistency guarantees. See the
> updated KIP for more info.
>
> I have fully implemented these changes, although I'm still not entirely
> happy with the implementation for segmented StateStores, so I plan to
> refactor that. Despite that, all tests pass. If you'd like to try out or
> review this highly experimental and incomplete branch, it's available here:
> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's built
> against Kafka 3.5.0 so that I had a stable base to build and test it on,
> and to enable easy apples-to-apples comparisons in a live environment. I
> plan to rebase it against trunk once it's nearer completion and has been
> proven on our main application.
>
> I would really appreciate help in reviewing and testing:
> - Segmented (Versioned, Session and Window) stores
> - Global stores
>
> As I do not currently use either of these, so my primary test environment
> doesn't test these areas.
>
> I'm going on Parental Leave starting next week for a few weeks, so will
> not have time to move this forward until late August. That said, your
> feedback is welcome and appreciated, I just won't be able to respond as
> quickly as usual.
>
> Regards,
> Nick
>
> On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com> wrote:
>
>> Hi Bruno
>>
>> Yes, that's correct, although the impact on IQ is not something I had
>> considered.
>>
>> What about atomically updating the state store from the transaction
>>> buffer every commit interval and writing the checkpoint (thus, flushing
>>> the memtable) every configured amount of data and/or number of commit
>>> intervals?
>>>
>>
>> I'm not quite sure I follow. Are you suggesting that we add an additional
>> config for the max number of commit intervals between checkpoints? That
>> way, we would checkpoint *either* when the transaction buffers are nearly
>> full, *OR* whenever a certain number of commit intervals have elapsed,
>> whichever comes first?
>>
>> That certainly seems reasonable, although this re-ignites an earlier
>> debate about whether a config should be measured in "number of commit
>> intervals", instead of just an absolute time.
>>
>> FWIW, I realised that this issue is the reason I was pursuing the Atomic
>> Checkpoints, as it de-couples memtable flush from checkpointing, which
>> enables us to just checkpoint on every commit without any performance
>> impact. Atomic Checkpointing is definitely the "best" solution, but I'm not
>> sure if this is enough to bring it back into this KIP.
>>
>> I'm currently working on moving all the transactional logic directly into
>> RocksDBStore itself, which does away with the StateStore#newTransaction
>> method, and reduces the number of new classes introduced, significantly
>> reducing the complexity. If it works, and the complexity is drastically
>> reduced, I may try bringing back Atomic Checkpoints into this KIP.
>>
>> Regards,
>> Nick
>>
>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org> wrote:
>>
>>> Hi Nick,
>>>
>>> Thanks for the insights! Very interesting!
>>>
>>> As far as I understand, you want to atomically update the state store
>>> from the transaction buffer, flush the memtable of a state store and
>>> write the checkpoint not after the commit time elapsed but after the
>>> transaction buffer reached a size that would lead to exceeding
>>> statestore.transaction.buffer.max.bytes before the next commit interval
>>> ends.
>>> That means, the Kafka transaction would commit every commit interval but
>>> the state store will only be atomically updated roughly every
>>> statestore.transaction.buffer.max.bytes of data. Also IQ would then only
>>> see new data roughly every statestore.transaction.buffer.max.bytes.
>>> After a failure the state store needs to restore up to
>>> statestore.transaction.buffer.max.bytes.
>>>
>>> Is this correct?
>>>
>>> What about atomically updating the state store from the transaction
>>> buffer every commit interval and writing the checkpoint (thus, flushing
>>> the memtable) every configured amount of data and/or number of commit
>>> intervals? In such a way, we would have the same delay for records
>>> appearing in output topics and IQ because both would appear when the
>>> Kafka transaction is committed. However, after a failure the state store
>>> still needs to restore up to statestore.transaction.buffer.max.bytes and
>>> it might restore data that is already in the state store because the
>>> checkpoint lags behind the last stable offset (i.e. the last committed
>>> offset) of the changelog topics. Restoring data that is already in the
>>> state store is idempotent, so eos should not violated.
>>> This solution needs at least one new config to specify when a checkpoint
>>> should be written.
>>>
>>>
>>>
>>> A small correction to your previous e-mail that does not change anything
>>> you said: Under alos the default commit interval is 30 seconds, not five
>>> seconds.
>>>
>>>
>>> Best,
>>> Bruno
>>>
>>>
>>> On 01.07.23 12:37, Nick Telford wrote:
>>> > Hi everyone,
>>> >
>>> > I've begun performance testing my branch on our staging environment,
>>> > putting it through its paces in our non-trivial application. I'm
>>> already
>>> > observing the same increased flush rate that we saw the last time we
>>> > attempted to use a version of this KIP, but this time, I think I know
>>> why.
>>> >
>>> > Pre-KIP-892, StreamTask#postCommit, which is called at the end of the
>>> Task
>>> > commit process, has the following behaviour:
>>> >
>>> >     - Under ALOS: checkpoint the state stores. This includes
>>> >     flushing memtables in RocksDB. This is acceptable because the
>>> default
>>> >     commit.interval.ms is 5 seconds, so forcibly flushing memtables
>>> every 5
>>> >     seconds is acceptable for most applications.
>>> >     - Under EOS: checkpointing is not done, *unless* it's being
>>> forced, due
>>> >     to e.g. the Task closing or being revoked. This means that under
>>> normal
>>> >     processing conditions, the state stores will not be checkpointed,
>>> and will
>>> >     not have memtables flushed at all , unless RocksDB decides to
>>> flush them on
>>> >     its own. Checkpointing stores and force-flushing their memtables
>>> is only
>>> >     done when a Task is being closed.
>>> >
>>> > Under EOS, KIP-892 needs to checkpoint stores on at least *some* normal
>>> > Task commits, in order to write the RocksDB transaction buffers to the
>>> > state stores, and to ensure the offsets are synced to disk to prevent
>>> > restores from getting out of hand. Consequently, my current
>>> implementation
>>> > calls maybeCheckpoint on *every* Task commit, which is far too
>>> frequent.
>>> > This causes checkpoints every 10,000 records, which is a change in
>>> flush
>>> > behaviour, potentially causing performance problems for some
>>> applications.
>>> >
>>> > I'm looking into possible solutions, and I'm currently leaning towards
>>> > using the statestore.transaction.buffer.max.bytes configuration to
>>> > checkpoint Tasks once we are likely to exceed it. This would
>>> complement the
>>> > existing "early Task commit" functionality that this configuration
>>> > provides, in the following way:
>>> >
>>> >     - Currently, we use statestore.transaction.buffer.max.bytes to
>>> force an
>>> >     early Task commit if processing more records would cause our state
>>> store
>>> >     transactions to exceed the memory assigned to them.
>>> >     - New functionality: when a Task *does* commit, we will not
>>> checkpoint
>>> >     the stores (and hence flush the transaction buffers) unless we
>>> expect to
>>> >     cross the statestore.transaction.buffer.max.bytes threshold before
>>> the next
>>> >     commit
>>> >
>>> > I'm also open to suggestions.
>>> >
>>> > Regards,
>>> > Nick
>>> >
>>> > On Thu, 22 Jun 2023 at 14:06, Nick Telford <nick.telf...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi Bruno!
>>> >>
>>> >> 3.
>>> >> By "less predictable for users", I meant in terms of understanding the
>>> >> performance profile under various circumstances. The more complex the
>>> >> solution, the more difficult it would be for users to understand the
>>> >> performance they see. For example, spilling records to disk when the
>>> >> transaction buffer reaches a threshold would, I expect, reduce write
>>> >> throughput. This reduction in write throughput could be unexpected,
>>> and
>>> >> potentially difficult to diagnose/understand for users.
>>> >> At the moment, I think the "early commit" concept is relatively
>>> >> straightforward; it's easy to document, and conceptually fairly
>>> obvious to
>>> >> users. We could probably add a metric to make it easier to understand
>>> when
>>> >> it happens though.
>>> >>
>>> >> 3. (the second one)
>>> >> The IsolationLevel is *essentially* an indirect way of telling
>>> StateStores
>>> >> whether they should be transactional. READ_COMMITTED essentially
>>> requires
>>> >> transactions, because it dictates that two threads calling
>>> >> `newTransaction()` should not see writes from the other transaction
>>> until
>>> >> they have been committed. With READ_UNCOMMITTED, all bets are off, and
>>> >> stores can allow threads to observe written records at any time,
>>> which is
>>> >> essentially "no transactions". That said, StateStores are free to
>>> implement
>>> >> these guarantees however they can, which is a bit more relaxed than
>>> >> dictating "you must use transactions". For example, with RocksDB we
>>> would
>>> >> implement these as READ_COMMITTED == WBWI-based "transactions",
>>> >> READ_UNCOMMITTED == direct writes to the database. But with other
>>> storage
>>> >> engines, it might be preferable to *always* use transactions, even
>>> when
>>> >> unnecessary; or there may be storage engines that don't provide
>>> >> transactions, but the isolation guarantees can be met using a
>>> different
>>> >> technique.
>>> >> My idea was to try to keep the StateStore interface as loosely coupled
>>> >> from the Streams engine as possible, to give implementers more
>>> freedom, and
>>> >> reduce the amount of internal knowledge required.
>>> >> That said, I understand that "IsolationLevel" might not be the right
>>> >> abstraction, and we can always make it much more explicit if
>>> required, e.g.
>>> >> boolean transactional()
>>> >>
>>> >> 7-8.
>>> >> I can make these changes either later today or tomorrow.
>>> >>
>>> >> Small update:
>>> >> I've rebased my branch on trunk and fixed a bunch of issues that
>>> needed
>>> >> addressing. Currently, all the tests pass, which is promising, but it
>>> will
>>> >> need to undergo some performance testing. I haven't (yet) worked on
>>> >> removing the `newTransaction()` stuff, but I would expect that,
>>> >> behaviourally, it should make no difference. The branch is available
>>> at
>>> >> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone is
>>> >> interested in taking an early look.
>>> >>
>>> >> Regards,
>>> >> Nick
>>> >>
>>> >> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <cado...@apache.org>
>>> wrote:
>>> >>
>>> >>> Hi Nick,
>>> >>>
>>> >>> 1.
>>> >>> Yeah, I agree with you. That was actually also my point. I understood
>>> >>> that John was proposing the ingestion path as a way to avoid the
>>> early
>>> >>> commits. Probably, I misinterpreted the intent.
>>> >>>
>>> >>> 2.
>>> >>> I agree with John here, that actually it is public API. My question
>>> is
>>> >>> how this usage pattern affects normal processing.
>>> >>>
>>> >>> 3.
>>> >>> My concern is that checking for the size of the transaction buffer
>>> and
>>> >>> maybe triggering an early commit affects the whole processing of
>>> Kafka
>>> >>> Streams. The transactionality of a state store is not confined to the
>>> >>> state store itself, but spills over and changes the behavior of other
>>> >>> parts of the system. I agree with you that it is a decent
>>> compromise. I
>>> >>> just wanted to analyse the downsides and list the options to overcome
>>> >>> them. I also agree with you that all options seem quite heavy
>>> compared
>>> >>> with your KIP. I do not understand what you mean with "less
>>> predictable
>>> >>> for users", though.
>>> >>>
>>> >>>
>>> >>> I found the discussions about the alternatives really interesting.
>>> But I
>>> >>> also think that your plan sounds good and we should continue with it!
>>> >>>
>>> >>>
>>> >>> Some comments on your reply to my e-mail on June 20th:
>>> >>>
>>> >>> 3.
>>> >>> Ah, now, I understand the reasoning behind putting isolation level in
>>> >>> the state store context. Thanks! Should that also be a way to give
>>> the
>>> >>> the state store the opportunity to decide whether to turn on
>>> >>> transactions or not?
>>> >>> With my comment, I was more concerned about how do you know if a
>>> >>> checkpoint file needs to be written under EOS, if you do not have a
>>> way
>>> >>> to know if the state store is transactional or not. If a state store
>>> is
>>> >>> transactional, the checkpoint file can be written during normal
>>> >>> processing under EOS. If the state store is not transactional, the
>>> >>> checkpoint file must not be written under EOS.
>>> >>>
>>> >>> 7.
>>> >>> My point was about not only considering the bytes in memory in config
>>> >>> statestore.uncommitted.max.bytes, but also bytes that might be
>>> spilled
>>> >>> on disk. Basically, I was wondering whether you should remove the
>>> >>> "memory" in "Maximum number of memory bytes to be used to
>>> >>> buffer uncommitted state-store records." My thinking was that even
>>> if a
>>> >>> state store spills uncommitted bytes to disk, limiting the overall
>>> bytes
>>> >>> might make sense. Thinking about it again and considering the recent
>>> >>> discussions, it does not make too much sense anymore.
>>> >>> I like the name statestore.transaction.buffer.max.bytes that you
>>> proposed.
>>> >>>
>>> >>> 8.
>>> >>> A high-level description (without implementation details) of how
>>> Kafka
>>> >>> Streams will manage the commit of changelog transactions, state store
>>> >>> transactions and checkpointing would be great. Would be great if you
>>> >>> could also add some sentences about the behavior in case of a
>>> failure.
>>> >>> For instance how does a transactional state store recover after a
>>> >>> failure or what happens with the transaction buffer, etc. (that is
>>> what
>>> >>> I meant by "fail-over" in point 9.)
>>> >>>
>>> >>> Best,
>>> >>> Bruno
>>> >>>
>>> >>> On 21.06.23 18:50, Nick Telford wrote:
>>> >>>> Hi Bruno,
>>> >>>>
>>> >>>> 1.
>>> >>>> Isn't this exactly the same issue that WriteBatchWithIndex
>>> transactions
>>> >>>> have, whereby exceeding (or likely to exceed) configured memory
>>> needs to
>>> >>>> trigger an early commit?
>>> >>>>
>>> >>>> 2.
>>> >>>> This is one of my big concerns. Ultimately, any approach based on
>>> >>> cracking
>>> >>>> open RocksDB internals and using it in ways it's not really designed
>>> >>> for is
>>> >>>> likely to have some unforseen performance or consistency issues.
>>> >>>>
>>> >>>> 3.
>>> >>>> What's your motivation for removing these early commits? While not
>>> >>> ideal, I
>>> >>>> think they're a decent compromise to ensure consistency whilst
>>> >>> maintaining
>>> >>>> good and predictable performance.
>>> >>>> All 3 of your suggested ideas seem *very* complicated, and might
>>> >>> actually
>>> >>>> make behaviour less predictable for users as a consequence.
>>> >>>>
>>> >>>> I'm a bit concerned that the scope of this KIP is growing a bit out
>>> of
>>> >>>> control. While it's good to discuss ideas for future improvements, I
>>> >>> think
>>> >>>> it's important to narrow the scope down to a design that achieves
>>> the
>>> >>> most
>>> >>>> pressing objectives (constant sized restorations during dirty
>>> >>>> close/unexpected errors). Any design that this KIP produces can
>>> >>> ultimately
>>> >>>> be changed in the future, especially if the bulk of it is internal
>>> >>>> behaviour.
>>> >>>>
>>> >>>> I'm going to spend some time next week trying to re-work the
>>> original
>>> >>>> WriteBatchWithIndex design to remove the newTransaction() method,
>>> such
>>> >>> that
>>> >>>> it's just an implementation detail of RocksDBStore. That way, if we
>>> >>> want to
>>> >>>> replace WBWI with something in the future, like the SST file
>>> management
>>> >>>> outlined by John, then we can do so with little/no API changes.
>>> >>>>
>>> >>>> Regards,
>>> >>>>
>>> >>>> Nick
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>

Reply via email to