Hello Nick,

First of all, thanks a lot for the great effort you've put in driving
this KIP! I really like it coming through finally, as many people in
the community have raised this. At the same time I honestly feel a bit
ashamed for not putting enough of my time supporting it and pushing it
through the finish line (you raised this KIP almost a year ago).

I briefly passed through the DISCUSS thread so far, not sure I've 100
percent digested all the bullet points. But with the goal of trying to
help take it through the finish line in mind, I'd want to throw
thoughts on top of my head only on the point #4 above which I felt may
be the main hurdle for the current KIP to drive to a consensus now.

The general question I asked myself is, whether we want to couple "IQ
reading mode" with "processing mode". While technically I tend to
agree with you that, it's feels like a bug if some single user chose
"EOS" for processing mode while choosing "read uncommitted" for IQ
reading mode, at the same time, I'm thinking if it's possible that
there could be two different persons (or even two teams) that would be
using the stream API to build the app, and the IQ API to query the
running state of the app. I know this is less of a technical thing but
rather a more design stuff, but if it could be ever the case, I'm
wondering if the personale using the IQ API knows about the risks of
using read uncommitted but still chose so for the favor of
performance, no matter if the underlying stream processing mode
configured by another personale is EOS or not. In that regard, I'm
leaning towards a "leaving the door open, and close it later if we
found it's a bad idea" aspect with a configuration that we can
potentially deprecate than "shut the door, clean for everyone". More
specifically, allowing the processing mode / IQ read mode to be
decoupled, and if we found that there's no such cases as I speculated
above or people started complaining a lot, we can still enforce
coupling them.

Again, just my 2c here. Thanks again for the great patience and
diligence on this KIP.


Guozhang



On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <nick.telf...@gmail.com> wrote:
>
> Hi Bruno,
>
> 4.
> I'll hold off on making that change until we have a consensus as to what
> configuration to use to control all of this, as it'll be affected by the
> decision on EOS isolation levels.
>
> 5.
> Done. I've chosen "committedOffsets".
>
> Regards,
> Nick
>
> On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org> wrote:
>
> > Hi Nick,
> >
> > 1.
> > Yeah, you are probably right that it does not make too much sense.
> > Thanks for the clarification!
> >
> >
> > 4.
> > Yes, sorry for the back and forth, but I think for the sake of the KIP
> > it is better to let the ALOS behavior as it is for now due to the
> > possible issues you would run into. Maybe we can find a solution in the
> > future. Now the question returns to whether we really need
> > default.state.isolation.level. Maybe the config could be the feature
> > flag Sophie requested.
> >
> >
> > 5.
> > There is a guideline in Kafka not to use the get prefix for getters (at
> > least in the public API). Thus, could you please rename
> >
> > getCommittedOffset(TopicPartition partition) ->
> > committedOffsetFor(TopicPartition partition)
> >
> > You can also propose an alternative to committedOffsetFor().
> >
> >
> > Best,
> > Bruno
> >
> >
> > On 10/13/23 3:21 PM, Nick Telford wrote:
> > > Hi Bruno,
> > >
> > > Thanks for getting back to me.
> > >
> > > 1.
> > > I think this should be possible. Are you thinking of the situation where
> > a
> > > user may downgrade to a previous version of Kafka Streams? In that case,
> > > sadly, the RocksDBStore would get wiped by the older version of Kafka
> > > Streams anyway, because that version wouldn't understand the extra column
> > > family (that holds offsets), so the missing Position file would
> > > automatically get rebuilt when the store is rebuilt from the changelog.
> > > Are there other situations than downgrade where a transactional store
> > could
> > > be replaced by a non-transactional one? I can't think of any.
> > >
> > > 2.
> > > Ahh yes, the Test Plan - my Kryptonite! This section definitely needs to
> > be
> > > fleshed out. I'll work on that. How much detail do you need?
> > >
> > > 3.
> > > See my previous email discussing this.
> > >
> > > 4.
> > > Hmm, this is an interesting point. Are you suggesting that under ALOS
> > > READ_COMMITTED should not be supported?
> > >
> > > Regards,
> > > Nick
> > >
> > > On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <cado...@apache.org> wrote:
> > >
> > >> Hi Nick,
> > >>
> > >> I think the KIP is converging!
> > >>
> > >>
> > >> 1.
> > >> I am wondering whether it makes sense to write the position file during
> > >> close as we do for the checkpoint file, so that in case the state store
> > >> is replaced with a non-transactional state store the non-transactional
> > >> state store finds the position file. I think, this is not strictly
> > >> needed, but would be a nice behavior instead of just deleting the
> > >> position file.
> > >>
> > >>
> > >> 2.
> > >> The test plan does not mention integration tests. Do you not need to
> > >> extend existing ones and add new ones. Also for upgrading and
> > >> downgrading you might need integration and/or system tests.
> > >>
> > >>
> > >> 3.
> > >> I think Sophie made a point. Although, IQ reading from uncommitted data
> > >> under EOS might be considered a bug by some people. Thus, your KIP would
> > >> fix a bug rather than changing the intended behavior. However, I also
> > >> see that a feature flag would help users that rely on this buggy
> > >> behavior (at least until AK 4.0).
> > >>
> > >>
> > >> 4.
> > >> This is related to the previous point. I assume that the difference
> > >> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in the
> > >> former you enable transactions on the state store and in the latter you
> > >> disable them. If my assumption is correct, I think that is an issue.
> > >> Let's assume under ALOS Streams fails over a couple of times more or
> > >> less at the same step in processing after value 3 is added to an
> > >> aggregation but the offset of the corresponding input record was not
> > >> committed. Without transactions disabled, the aggregation value would
> > >> increase by 3 for each failover. With transactions enabled, value 3
> > >> would only be added to the aggregation once when the offset of the input
> > >> record is committed and the transaction finally completes. So the
> > >> content of the state store would change depending on the configuration
> > >> for IQ. IMO, the content of the state store should be independent from
> > >> IQ. Given this issue, I propose to not use transactions with ALOS at
> > >> all. I was a big proponent of using transactions with ALOS, but I
> > >> realized that transactions with ALOS is not as easy as enabling
> > >> transactions on state stores. Another aspect that is problematic is that
> > >> the changelog topic which actually replicates the state store is not
> > >> transactional under ALOS. Thus, it might happen that the state store and
> > >> the changelog differ in their content. All of this is maybe solvable
> > >> somehow, but for the sake of this KIP, I would leave it for the future.
> > >>
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >>
> > >>
> > >> On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:
> > >>> Hey Nick! First of all thanks for taking up this awesome feature, I'm
> > >> sure
> > >>> every single
> > >>> Kafka Streams user and dev would agree that it is sorely needed.
> > >>>
> > >>> I've just been catching up on the KIP and surrounding discussion, so
> > >> please
> > >>> forgive me
> > >>> for any misunderstandings or misinterpretations of the current plan and
> > >>> don't hesitate to
> > >>> correct me.
> > >>>
> > >>> Before I jump in, I just want to say that having seen this drag on for
> > so
> > >>> long, my singular
> > >>> goal in responding is to help this KIP past a perceived impasse so we
> > can
> > >>> finally move on
> > >>> to voting and implementing it. Long discussions are to be expected for
> > >>> major features like
> > >>> this but it's completely on us as the Streams devs to make sure there
> > is
> > >> an
> > >>> end in sight
> > >>> for any ongoing discussion.
> > >>>
> > >>> With that said, it's my understanding that the KIP as currently
> > proposed
> > >> is
> > >>> just not tenable
> > >>> for Kafka Streams, and would prevent some EOS users from upgrading to
> > the
> > >>> version it
> > >>> first appears in. Given that we can't predict or guarantee whether any
> > of
> > >>> the followup KIPs
> > >>> would be completed in the same release cycle as this one, we need to
> > make
> > >>> sure that the
> > >>> feature is either compatible with all current users or else
> > >> feature-flagged
> > >>> so that they may
> > >>> opt in/out.
> > >>>
> > >>> Therefore, IIUC we need to have either (or both) of these as
> > >>> fully-implemented config options:
> > >>> 1. default.state.isolation.level
> > >>> 2. enable.transactional.state.stores
> > >>>
> > >>> This way EOS users for whom read_committed semantics are not viable can
> > >>> still upgrade,
> > >>> and either use the isolation.level config to leverage the new txn state
> > >>> stores without sacrificing
> > >>> their application semantics, or else simply keep the transactional
> > state
> > >>> stores disabled until we
> > >>> are able to fully implement the isolation level configuration at either
> > >> an
> > >>> application or query level.
> > >>>
> > >>> Frankly you are the expert here and know much more about the tradeoffs
> > in
> > >>> both semantics and
> > >>> effort level of implementing one of these configs vs the other. In my
> > >>> opinion, either option would
> > >>> be fine and I would leave the decision of which one to include in this
> > >> KIP
> > >>> completely up to you.
> > >>> I just don't see a way for the KIP to proceed without some variation of
> > >> the
> > >>> above that would allow
> > >>> EOS users to opt-out of read_committed.
> > >>>
> > >>> (If it's all the same to you, I would recommend always including a
> > >> feature
> > >>> flag in large structural
> > >>> changes like this. No matter how much I trust someone or myself to
> > >>> implement a feature, you just
> > >>> never know what kind of bugs might slip in, especially with the very
> > >> first
> > >>> iteration that gets released.
> > >>> So personally, my choice would be to add the feature flag and leave it
> > >> off
> > >>> by default. If all goes well
> > >>> you can do a quick KIP to enable it by default as soon as the
> > >>> isolation.level config has been
> > >>> completed. But feel free to just pick whichever option is easiest or
> > >>> quickest for you to implement)
> > >>>
> > >>> Hope this helps move the discussion forward,
> > >>> Sophie
> > >>>
> > >>> On Tue, Sep 19, 2023 at 1:57 AM Nick Telford <nick.telf...@gmail.com>
> > >> wrote:
> > >>>
> > >>>> Hi Bruno,
> > >>>>
> > >>>> Agreed, I can live with that for now.
> > >>>>
> > >>>> In an effort to keep the scope of this KIP from expanding, I'm leaning
> > >>>> towards just providing a configurable default.state.isolation.level
> > and
> > >>>> removing IsolationLevel from the StateStoreContext. This would be
> > >>>> compatible with adding support for query-time IsolationLevels in the
> > >>>> future, whilst providing a way for users to select an isolation level
> > >> now.
> > >>>>
> > >>>> The big problem with this, however, is that if a user selects
> > >>>> processing.mode
> > >>>> = "exactly-once(-v2|-beta)", and default.state.isolation.level =
> > >>>> "READ_UNCOMMITTED", we need to guarantee that the data isn't written
> > to
> > >>>> disk until commit() is called, but we also need to permit IQ threads
> > to
> > >>>> read from the ongoing transaction.
> > >>>>
> > >>>> A simple solution would be to (temporarily) forbid this combination of
> > >>>> configuration, and have default.state.isolation.level automatically
> > >> switch
> > >>>> to READ_COMMITTED when processing.mode is anything other than
> > >>>> at-least-once. Do you think this would be acceptable?
> > >>>>
> > >>>> In a later KIP, we can add support for query-time isolation levels and
> > >>>> solve this particular problem there, which would relax this
> > restriction.
> > >>>>
> > >>>> Regards,
> > >>>> Nick
> > >>>>
> > >>>> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <cado...@apache.org>
> > >> wrote:
> > >>>>
> > >>>>> Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I
> > think
> > >>>>> it is perfectly valid to say InMemoryKeyValueStore do not support
> > >>>>> READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto
> > default
> > >>>>> at the moment.
> > >>>>>
> > >>>>> Best,
> > >>>>> Bruno
> > >>>>>
> > >>>>> On 9/18/23 7:12 PM, Nick Telford wrote:
> > >>>>>> Oh! One other concern I haven't mentioned: if we make
> > IsolationLevel a
> > >>>>>> query-time constraint, then we need to add support for
> > READ_COMMITTED
> > >>>> to
> > >>>>>> InMemoryKeyValueStore too, which will require some changes to the
> > >>>>>> implementation.
> > >>>>>>
> > >>>>>> On Mon, 18 Sept 2023 at 17:24, Nick Telford <nick.telf...@gmail.com
> > >
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi everyone,
> > >>>>>>>
> > >>>>>>> I agree that having IsolationLevel be determined at query-time is
> > the
> > >>>>>>> ideal design, but there are a few sticking points:
> > >>>>>>>
> > >>>>>>> 1.
> > >>>>>>> There needs to be some way to communicate the IsolationLevel down
> > to
> > >>>> the
> > >>>>>>> RocksDBStore itself, so that the query can respect it. Since stores
> > >>>> are
> > >>>>>>> "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore,
> > >>>>> etc.),
> > >>>>>>> we need some way to deliver that information to the bottom layer.
> > For
> > >>>>> IQv2,
> > >>>>>>> we can use the existing State#query() method, but IQv1 has no way
> > to
> > >>>> do
> > >>>>>>> this.
> > >>>>>>>
> > >>>>>>> A simple approach, which would potentially open up other options,
> > >>>> would
> > >>>>> be
> > >>>>>>> to add something like: ReadOnlyKeyValueStore<K, V>
> > >>>>>>> readOnlyView(IsolationLevel isolationLevel) to
> > ReadOnlyKeyValueStore
> > >>>>> (and
> > >>>>>>> similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.).
> > >>>>>>>
> > >>>>>>> 2.
> > >>>>>>> As mentioned above, RocksDB WriteBatches are not thread-safe, which
> > >>>>> causes
> > >>>>>>> a problem if we want to provide READ_UNCOMMITTED Iterators. I also
> > >>>> had a
> > >>>>>>> look at RocksDB Transactions[1], but they solve a very different
> > >>>>> problem,
> > >>>>>>> and have the same thread-safety issue.
> > >>>>>>>
> > >>>>>>> One possible approach that I mentioned is chaining WriteBatches:
> > >> every
> > >>>>>>> time a new Interactive Query is received (i.e. readOnlyView, see
> > >>>> above,
> > >>>>>>> is called) we "freeze" the existing WriteBatch, and start a new one
> > >>>> for
> > >>>>> new
> > >>>>>>> writes. The Interactive Query queries the "chain" of previous
> > >>>>> WriteBatches
> > >>>>>>> + the underlying database; while the StreamThread starts writing to
> > >>>> the
> > >>>>>>> *new* WriteBatch. On-commit, the StreamThread would write *all*
> > >>>>>>> WriteBatches in the chain to the database (that have not yet been
> > >>>>> written).
> > >>>>>>>
> > >>>>>>> WriteBatches would be closed/freed only when they have been both
> > >>>>>>> committed, and all open Interactive Queries on them have been
> > closed.
> > >>>>> This
> > >>>>>>> would require some reference counting.
> > >>>>>>>
> > >>>>>>> Obviously a drawback of this approach is the potential for
> > increased
> > >>>>>>> memory usage: if an Interactive Query is long-lived, for example by
> > >>>>> doing a
> > >>>>>>> full scan over a large database, or even just pausing in the middle
> > >> of
> > >>>>> an
> > >>>>>>> iteration, then the existing chain of WriteBatches could be kept
> > >>>> around
> > >>>>> for
> > >>>>>>> a long time, potentially forever.
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>>
> > >>>>>>> A.
> > >>>>>>> Going off on a tangent, it looks like in addition to supporting
> > >>>>>>> READ_COMMITTED queries, we could go further and support
> > >>>> REPEATABLE_READ
> > >>>>>>> queries (i.e. where subsequent reads to the same key in the same
> > >>>>>>> Interactive Query are guaranteed to yield the same value) by making
> > >>>> use
> > >>>>> of
> > >>>>>>> RocksDB Snapshots[2]. These are fairly lightweight, so the
> > >> performance
> > >>>>>>> impact is likely to be negligible, but they do require that the
> > >>>>> Interactive
> > >>>>>>> Query session can be explicitly closed.
> > >>>>>>>
> > >>>>>>> This could be achieved if we made the above readOnlyView interface
> > >>>> look
> > >>>>>>> more like:
> > >>>>>>>
> > >>>>>>> interface ReadOnlyKeyValueView<K, V> implements
> > >>>> ReadOnlyKeyValueStore<K,
> > >>>>>>> V>, AutoCloseable {}
> > >>>>>>>
> > >>>>>>> interface ReadOnlyKeyValueStore<K, V> {
> > >>>>>>>        ...
> > >>>>>>>        ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel
> > >>>>> isolationLevel);
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> But this would be a breaking change, as existing IQv1 queries are
> > >>>>>>> guaranteed to never call store.close(), and therefore these would
> > >> leak
> > >>>>>>> memory under REPEATABLE_READ.
> > >>>>>>>
> > >>>>>>> B.
> > >>>>>>> One thing that's notable: MyRocks states that they support
> > >>>>> READ_COMMITTED
> > >>>>>>> and REPEATABLE_READ, but they make no mention of
> > >>>> READ_UNCOMMITTED[3][4].
> > >>>>>>> This could be because doing so is technically difficult/impossible
> > >>>> using
> > >>>>>>> the primitives available in RocksDB.
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>>
> > >>>>>>> Lucas, to address your points:
> > >>>>>>>
> > >>>>>>> U1.
> > >>>>>>> It's only "SHOULD" to permit alternative (i.e. non-RocksDB)
> > >>>>>>> implementations of StateStore that do not support atomic writes.
> > >>>>> Obviously
> > >>>>>>> in those cases, the guarantees Kafka Streams provides/expects would
> > >> be
> > >>>>>>> relaxed. Do you think we should require all implementations to
> > >> support
> > >>>>>>> atomic writes?
> > >>>>>>>
> > >>>>>>> U2.
> > >>>>>>> Stores can support multiple IsolationLevels. As we've discussed
> > >> above,
> > >>>>> the
> > >>>>>>> ideal scenario would be to specify the IsolationLevel at
> > query-time.
> > >>>>>>> Failing that, I think the second-best approach is to define the
> > >>>>>>> IsolationLevel for *all* queries based on the processing.mode,
> > which
> > >>>> is
> > >>>>>>> what the default StateStoreContext#isolationLevel() achieves. Would
> > >>>> you
> > >>>>>>> prefer an alternative?
> > >>>>>>>
> > >>>>>>> While the existing implementation is equivalent to
> > READ_UNCOMMITTED,
> > >>>>> this
> > >>>>>>> can yield unexpected results/errors under EOS, if a transaction is
> > >>>>> rolled
> > >>>>>>> back. While this would be a change in behaviour for users, it would
> > >>>> look
> > >>>>>>> more like a bug fix than a breaking change. That said, we *could*
> > >> make
> > >>>>> it
> > >>>>>>> configurable, and default to the existing behaviour
> > >> (READ_UNCOMMITTED)
> > >>>>>>> instead of inferring it from the processing.mode?
> > >>>>>>>
> > >>>>>>> N1, N2.
> > >>>>>>> These were only primitives to avoid boxing costs, but since this is
> > >>>> not
> > >>>>> a
> > >>>>>>> performance sensitive area, it should be fine to change if that's
> > >>>>> desirable.
> > >>>>>>>
> > >>>>>>> N3.
> > >>>>>>> It's because the store "manages its own offsets", which includes
> > both
> > >>>>>>> committing the offset, *and providing it* via getCommittedOffset().
> > >>>>>>> Personally, I think "managesOffsets" conveys this best, but I don't
> > >>>> mind
> > >>>>>>> changing it if the nomenclature is unclear.
> > >>>>>>>
> > >>>>>>> Sorry for the massive emails/essays!
> > >>>>>>> --
> > >>>>>>> Nick
> > >>>>>>>
> > >>>>>>> 1: https://github.com/facebook/rocksdb/wiki/Transactions
> > >>>>>>> 2: https://github.com/facebook/rocksdb/wiki/Snapshot
> > >>>>>>> 3:
> > https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
> > >>>>>>> 4: https://mariadb.com/kb/en/myrocks-transactional-isolation/
> > >>>>>>>
> > >>>>>>> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
> > >>>>>>> <lbruts...@confluent.io.invalid> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Nick,
> > >>>>>>>>
> > >>>>>>>> since I last read it in April, the KIP has become much cleaner and
> > >>>>>>>> easier to read. Great work!
> > >>>>>>>>
> > >>>>>>>> It feels to me the last big open point is whether we can implement
> > >>>>>>>> isolation level as a query parameter. I understand that there are
> > >>>>>>>> implementation concerns, but as Colt says, it would be a great
> > >>>>>>>> addition, and would also simplify the migration path for this
> > >> change.
> > >>>>>>>> Is the implementation problem you mentioned caused by the
> > WriteBatch
> > >>>>>>>> not having a notion of a snapshot, as the underlying DB iterator
> > >>>> does?
> > >>>>>>>> In that case, I am not sure a chain of WriteBatches as you propose
> > >>>>>>>> would fully solve the problem, but maybe I didn't dig enough into
> > >> the
> > >>>>>>>> details to fully understand it.
> > >>>>>>>>
> > >>>>>>>> If it's not possible to implement it now, would it be an option to
> > >>>>>>>> make sure in this KIP that we do not fully close the door on
> > >>>> per-query
> > >>>>>>>> isolation levels in the interface, as it may be possible to
> > >> implement
> > >>>>>>>> the missing primitives in RocksDB or Speedb in the future.
> > >>>>>>>>
> > >>>>>>>> Understanding:
> > >>>>>>>>
> > >>>>>>>> * U1) Why is it only "SHOULD" for changelogOffsets to be persisted
> > >>>>>>>> atomically with the records?
> > >>>>>>>> * U2) Don't understand the default implementation of
> > >>>> `isolationLevel`.
> > >>>>>>>> The isolation level should be a property of the underlying store,
> > >> and
> > >>>>>>>> not be defined by the default config? Existing stores probably
> > don't
> > >>>>>>>> guarantee READ_COMMITTED, so the default should be to return
> > >>>>>>>> READ_UNCOMMITTED.
> > >>>>>>>>
> > >>>>>>>> Nits:
> > >>>>>>>> * N1) Could `getComittedOffset` use an `OptionalLong` return type,
> > >> to
> > >>>>>>>> avoid the `null`?
> > >>>>>>>> * N2) Could `apporixmateNumUncomittedBytes` use an `OptionalLong`
> > >>>>>>>> return type, to avoid the `-1`?
> > >>>>>>>> * N3) I don't understand why `managesOffsets` uses the 'manage'
> > >> verb,
> > >>>>>>>> whereas all other methods use the "commits" verb. I'd suggest
> > >>>>>>>> `commitsOffsets`.
> > >>>>>>>>
> > >>>>>>>> Either way, it feels this KIP is very close to the finish line,
> > I'm
> > >>>>>>>> looking forward to seeing this in production!
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Lucas
> > >>>>>>>>
> > >>>>>>>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <c...@littlehorse.io
> > >
> > >>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Making IsolationLevel a query-time constraint, rather than
> > linking
> > >>>> it
> > >>>>>>>> to
> > >>>>>>>>> the processing.guarantee.
> > >>>>>>>>>
> > >>>>>>>>> As I understand it, would this allow even a user of EOS to
> > control
> > >>>>>>>> whether
> > >>>>>>>>> reading committed or uncommitted records? If so, I am highly in
> > >>>> favor
> > >>>>> of
> > >>>>>>>>> this.
> > >>>>>>>>>
> > >>>>>>>>> I know that I was one of the early people to point out the
> > current
> > >>>>>>>>> shortcoming that IQ reads uncommitted records, but just this
> > >>>> morning I
> > >>>>>>>>> realized a pattern we use which means that (for certain queries)
> > >> our
> > >>>>>>>> system
> > >>>>>>>>> needs to be able to read uncommitted records, which is the
> > current
> > >>>>>>>> behavior
> > >>>>>>>>> of Kafka Streams in EOS.***
> > >>>>>>>>>
> > >>>>>>>>> If IsolationLevel being a query-time decision allows for this,
> > then
> > >>>>> that
> > >>>>>>>>> would be amazing. I would also vote that the default behavior
> > >> should
> > >>>>> be
> > >>>>>>>> for
> > >>>>>>>>> reading uncommitted records, because it is totally possible for a
> > >>>>> valid
> > >>>>>>>>> application to depend on that behavior, and breaking it in a
> > minor
> > >>>>>>>> release
> > >>>>>>>>> might be a bit strong.
> > >>>>>>>>>
> > >>>>>>>>> *** (Note, for the curious reader....) Our use-case/query pattern
> > >>>> is a
> > >>>>>>>> bit
> > >>>>>>>>> complex, but reading "uncommitted" records is actually safe in
> > our
> > >>>>> case
> > >>>>>>>>> because processing is deterministic. Additionally, IQ being able
> > to
> > >>>>> read
> > >>>>>>>>> uncommitted records is crucial to enable "read your own writes"
> > on
> > >>>> our
> > >>>>>>>> API:
> > >>>>>>>>> Due to the deterministic processing, we send an "ack" to the
> > client
> > >>>>> who
> > >>>>>>>>> makes the request as soon as the processor processes the result.
> > If
> > >>>>> they
> > >>>>>>>>> can't read uncommitted records, they may receive a "201 -
> > Created"
> > >>>>>>>>> response, immediately followed by a "404 - Not Found" when doing
> > a
> > >>>>>>>> lookup
> > >>>>>>>>> for the object they just created).
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>> Colt McNealy
> > >>>>>>>>>
> > >>>>>>>>> *Founder, LittleHorse.dev*
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <
> > >>>> nick.telf...@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Addendum:
> > >>>>>>>>>>
> > >>>>>>>>>> I think we would also face the same problem with the approach
> > John
> > >>>>>>>> outlined
> > >>>>>>>>>> earlier (using the record cache as a transaction buffer and
> > >>>> flushing
> > >>>>>>>> it
> > >>>>>>>>>> straight to SST files). This is because the record cache (the
> > >>>>>>>> ThreadCache
> > >>>>>>>>>> class) is not thread-safe, so every commit would invalidate open
> > >> IQ
> > >>>>>>>>>> Iterators in the same way that RocksDB WriteBatches do.
> > >>>>>>>>>> --
> > >>>>>>>>>> Nick
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford <
> > >>>> nick.telf...@gmail.com>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I've updated the KIP based on our conversation. The only things
> > >>>>>>>> I've not
> > >>>>>>>>>>> yet done are:
> > >>>>>>>>>>>
> > >>>>>>>>>>> 1. Using transactions under ALOS and EOS.
> > >>>>>>>>>>> 2. Making IsolationLevel a query-time constraint, rather than
> > >>>>>>>> linking it
> > >>>>>>>>>>> to the processing.guarantee.
> > >>>>>>>>>>>
> > >>>>>>>>>>> There's a wrinkle that makes this a challenge: Interactive
> > >> Queries
> > >>>>>>>> that
> > >>>>>>>>>>> open an Iterator, when using transactions and READ_UNCOMMITTED.
> > >>>>>>>>>>> The problem is that under READ_UNCOMMITTED, queries need to be
> > >>>> able
> > >>>>>>>> to
> > >>>>>>>>>>> read records from the currently uncommitted transaction buffer
> > >>>>>>>>>>> (WriteBatch). This includes for Iterators, which should iterate
> > >>>>>>>> both the
> > >>>>>>>>>>> transaction buffer and underlying database (using
> > >>>>>>>>>>> WriteBatch#iteratorWithBase()).
> > >>>>>>>>>>>
> > >>>>>>>>>>> The issue is that when the StreamThread commits, it writes the
> > >>>>>>>> current
> > >>>>>>>>>>> WriteBatch to RocksDB *and then clears the WriteBatch*.
> > Clearing
> > >>>> the
> > >>>>>>>>>>> WriteBatch while an Interactive Query holds an open Iterator on
> > >> it
> > >>>>>>>> will
> > >>>>>>>>>>> invalidate the Iterator. Worse, it turns out that Iterators
> > over
> > >> a
> > >>>>>>>>>>> WriteBatch become invalidated not just when the WriteBatch is
> > >>>>>>>> cleared,
> > >>>>>>>>>> but
> > >>>>>>>>>>> also when the Iterators' current key receives a new write.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Now that I'm writing this, I remember that this is the major
> > >>>> reason
> > >>>>>>>> that
> > >>>>>>>>>> I
> > >>>>>>>>>>> switched the original design from having a query-time
> > >>>>>>>> IsolationLevel to
> > >>>>>>>>>>> having the IsolationLevel linked to the transactionality of the
> > >>>>>>>> stores
> > >>>>>>>>>>> themselves.
> > >>>>>>>>>>>
> > >>>>>>>>>>> It *might* be possible to resolve this, by having a "chain" of
> > >>>>>>>>>>> WriteBatches, with the StreamThread switching to a new
> > WriteBatch
> > >>>>>>>>>> whenever
> > >>>>>>>>>>> a new Interactive Query attempts to read from the database, but
> > >>>> that
> > >>>>>>>>>> could
> > >>>>>>>>>>> cause some performance problems/memory pressure when subjected
> > to
> > >>>> a
> > >>>>>>>> high
> > >>>>>>>>>>> Interactive Query load. It would also reduce the efficiency of
> > >>>>>>>>>> WriteBatches
> > >>>>>>>>>>> on-commit, as we'd have to write N WriteBatches, where N is the
> > >>>>>>>> number of
> > >>>>>>>>>>> Interactive Queries since the last commit.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I realise this is getting into the weeds of the implementation,
> > >>>> and
> > >>>>>>>> you'd
> > >>>>>>>>>>> rather we focus on the API for now, but I think it's important
> > to
> > >>>>>>>>>> consider
> > >>>>>>>>>>> how to implement the desired API, in case we come up with an
> > API
> > >>>>>>>> that
> > >>>>>>>>>>> cannot be implemented efficiently, or even at all!
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thoughts?
> > >>>>>>>>>>> --
> > >>>>>>>>>>> Nick
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <
> > cado...@apache.org
> > >>>
> > >>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 6.
> > >>>>>>>>>>>> Of course, you are right! My bad!
> > >>>>>>>>>>>> Wiping out the state in the downgrading case is fine.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 3a.
> > >>>>>>>>>>>> Focus on the public facing changes for the KIP. We will manage
> > >> to
> > >>>>>>>> get
> > >>>>>>>>>>>> the internals right. Regarding state stores that do not
> > support
> > >>>>>>>>>>>> READ_COMMITTED, they should throw an error stating that they
> > do
> > >>>> not
> > >>>>>>>>>>>> support READ_COMMITTED. No need to adapt all state stores
> > >>>>>>>> immediately.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 3b.
> > >>>>>>>>>>>> I am in favor of using transactions also for ALOS.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote:
> > >>>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for getting back to me!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>> The fact that implementations can always track estimated
> > memory
> > >>>>>>>> usage
> > >>>>>>>>>> in
> > >>>>>>>>>>>>> the wrapper is a good point. I can remove -1 as an option,
> > and
> > >>>>>>>> I'll
> > >>>>>>>>>>>> clarify
> > >>>>>>>>>>>>> the JavaDoc that 0 is not just for non-transactional stores,
> > >>>>>>>> which is
> > >>>>>>>>>>>>> currently misleading.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>> The problem with catching the exception in the downgrade
> > >> process
> > >>>>>>>> is
> > >>>>>>>>>> that
> > >>>>>>>>>>>>> would require new code in the Kafka version being downgraded
> > >> to.
> > >>>>>>>> Since
> > >>>>>>>>>>>>> users could conceivably downgrade to almost *any* older
> > version
> > >>>>>>>> of
> > >>>>>>>>>> Kafka
> > >>>>>>>>>>>>> Streams, I'm not sure how we could add that code?
> > >>>>>>>>>>>>> The only way I can think of doing it would be to provide a
> > >>>>>>>> dedicated
> > >>>>>>>>>>>>> downgrade tool, that goes through every local store and
> > removes
> > >>>>>>>> the
> > >>>>>>>>>>>>> offsets column families. But that seems like an unnecessary
> > >>>>>>>> amount of
> > >>>>>>>>>>>> extra
> > >>>>>>>>>>>>> code to maintain just to handle a somewhat niche situation,
> > >> when
> > >>>>>>>> the
> > >>>>>>>>>>>>> alternative (automatically wipe and restore stores) should be
> > >>>>>>>>>>>> acceptable.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've requested.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 3a.
> > >>>>>>>>>>>>> I agree that IsolationLevel makes more sense at query-time,
> > and
> > >>>> I
> > >>>>>>>>>>>> actually
> > >>>>>>>>>>>>> initially attempted to place the IsolationLevel at
> > query-time,
> > >>>>>>>> but I
> > >>>>>>>>>> ran
> > >>>>>>>>>>>>> into some problems:
> > >>>>>>>>>>>>> - The key issue is that, under ALOS we're not staging writes
> > in
> > >>>>>>>>>>>>> transactions, so can't perform writes at the READ_COMMITTED
> > >>>>>>>> isolation
> > >>>>>>>>>>>>> level. However, this may be addressed if we decide to
> > *always*
> > >>>>>>>> use
> > >>>>>>>>>>>>> transactions as discussed under 3b.
> > >>>>>>>>>>>>> - IQv1 and IQv2 have quite different implementations. I
> > >> remember
> > >>>>>>>>>> having
> > >>>>>>>>>>>>> some difficulty understanding the IQv1 internals, which made
> > it
> > >>>>>>>>>>>> difficult
> > >>>>>>>>>>>>> to determine what needed to be changed. However, I *think*
> > this
> > >>>>>>>> can be
> > >>>>>>>>>>>>> addressed for both implementations by wrapping the
> > RocksDBStore
> > >>>>>>>> in an
> > >>>>>>>>>>>>> IsolationLevel-dependent wrapper, that overrides read methods
> > >>>>>>>> (get,
> > >>>>>>>>>>>> etc.)
> > >>>>>>>>>>>>> to either read directly from the database or from the ongoing
> > >>>>>>>>>>>> transaction.
> > >>>>>>>>>>>>> But IQv1 might still be difficult.
> > >>>>>>>>>>>>> - If IsolationLevel becomes a query constraint, then all
> > other
> > >>>>>>>>>>>> StateStores
> > >>>>>>>>>>>>> will need to respect it, including the in-memory stores. This
> > >>>>>>>> would
> > >>>>>>>>>>>> require
> > >>>>>>>>>>>>> us to adapt in-memory stores to stage their writes so they
> > can
> > >>>> be
> > >>>>>>>>>>>> isolated
> > >>>>>>>>>>>>> from READ_COMMITTTED queries. It would also become an
> > important
> > >>>>>>>>>>>>> consideration for third-party stores on upgrade, as without
> > >>>>>>>> changes,
> > >>>>>>>>>>>> they
> > >>>>>>>>>>>>> would not support READ_COMMITTED queries correctly.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Ultimately, I may need some help making the necessary change
> > to
> > >>>>>>>> IQv1
> > >>>>>>>>>> to
> > >>>>>>>>>>>>> support this, but I don't think it's fundamentally
> > impossible,
> > >>>>>>>> if we
> > >>>>>>>>>>>> want
> > >>>>>>>>>>>>> to pursue this route.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 3b.
> > >>>>>>>>>>>>> The main reason I chose to keep ALOS un-transactional was to
> > >>>>>>>> minimize
> > >>>>>>>>>>>>> behavioural change for most users (I believe most Streams
> > users
> > >>>>>>>> use
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> default configuration, which is ALOS). That said, it's clear
> > >>>>>>>> that if
> > >>>>>>>>>>>> ALOS
> > >>>>>>>>>>>>> also used transactional stores, the only change in behaviour
> > >>>>>>>> would be
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>> it would become *more correct*, which could be considered a
> > >> "bug
> > >>>>>>>> fix"
> > >>>>>>>>>> by
> > >>>>>>>>>>>>> users, rather than a change they need to handle.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I believe that performance using transactions (aka. RocksDB
> > >>>>>>>>>>>> WriteBatches)
> > >>>>>>>>>>>>> should actually be *better* than the un-batched write-path
> > that
> > >>>>>>>> is
> > >>>>>>>>>>>>> currently used[1]. The only "performance" consideration will
> > be
> > >>>>>>>> the
> > >>>>>>>>>>>>> increased memory usage that transactions require. Given the
> > >>>>>>>>>> mitigations
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>> this memory that we have in place, I would expect that this
> > is
> > >>>>>>>> not a
> > >>>>>>>>>>>>> problem for most users.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> If we're happy to do so, we can make ALOS also use
> > >> transactions.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Link 1:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>
> > >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <
> > >>>> cado...@apache.org
> > >>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks for the updates and sorry for the delay on my side!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>> Making the default implementation for flush() a no-op sounds
> > >>>>>>>> good to
> > >>>>>>>>>>>> me.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>> I think what was bugging me here is that a third-party state
> > >>>>>>>> store
> > >>>>>>>>>>>> needs
> > >>>>>>>>>>>>>> to implement the state store interface. That means they need
> > >> to
> > >>>>>>>>>>>>>> implement a wrapper around the actual state store as we do
> > for
> > >>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>> with RocksDBStore. So, a third-party state store can always
> > >>>>>>>> estimate
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> uncommitted bytes, if it wants, because the wrapper can
> > record
> > >>>>>>>> the
> > >>>>>>>>>>>> added
> > >>>>>>>>>>>>>> bytes.
> > >>>>>>>>>>>>>> One case I can think of where returning -1 makes sense is
> > when
> > >>>>>>>>>> Streams
> > >>>>>>>>>>>>>> does not need to estimate the size of the write batch and
> > >>>>>>>> trigger
> > >>>>>>>>>>>>>> extraordinary commits, because the third-party state store
> > >>>>>>>> takes care
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>> memory. But in that case the method could also just return
> > 0.
> > >>>>>>>> Even
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>> case would be better solved with a method that returns
> > whether
> > >>>>>>>> the
> > >>>>>>>>>>>> state
> > >>>>>>>>>>>>>> store manages itself the memory used for uncommitted bytes
> > or
> > >>>>>>>> not.
> > >>>>>>>>>>>>>> Said that, I am fine with keeping the -1 return value, I was
> > >>>>>>>> just
> > >>>>>>>>>>>>>> wondering when and if it will be used.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Regarding returning 0 for transactional state stores when
> > the
> > >>>>>>>> batch
> > >>>>>>>>>> is
> > >>>>>>>>>>>>>> empty, I was just wondering because you explicitly stated
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> "or {@code 0} if this StateStore does not support
> > >>>> transactions."
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> So it seemed to me returning 0 could only happen for
> > >>>>>>>>>> non-transactional
> > >>>>>>>>>>>>>> state stores.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> a) What do you think if we move the isolation level to IQ
> > (v1
> > >>>>>>>> and
> > >>>>>>>>>> v2)?
> > >>>>>>>>>>>>>> In the end this is the only component that really needs to
> > >>>>>>>> specify
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> isolation level. It is similar to the Kafka consumer that
> > can
> > >>>>>>>> choose
> > >>>>>>>>>>>>>> with what isolation level to read the input topic.
> > >>>>>>>>>>>>>> For IQv1 the isolation level should go into
> > >>>>>>>> StoreQueryParameters. For
> > >>>>>>>>>>>>>> IQv2, I would add it to the Query interface.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> b) Point a) raises the question what should happen during
> > >>>>>>>>>> at-least-once
> > >>>>>>>>>>>>>> processing when the state store does not use transactions?
> > >> John
> > >>>>>>>> in
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> past proposed to also use transactions on state stores for
> > >>>>>>>>>>>>>> at-least-once. I like that idea, because it avoids
> > aggregating
> > >>>>>>>> the
> > >>>>>>>>>> same
> > >>>>>>>>>>>>>> records over and over again in the case of a failure. We
> > had a
> > >>>>>>>> case
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>> the past where a Streams applications in at-least-once mode
> > >> was
> > >>>>>>>>>> failing
> > >>>>>>>>>>>>>> continuously for some reasons I do not remember before
> > >>>>>>>> committing the
> > >>>>>>>>>>>>>> offsets. After each failover, the app aggregated again and
> > >>>>>>>> again the
> > >>>>>>>>>>>>>> same records. Of course the aggregate increased to very
> > wrong
> > >>>>>>>> values
> > >>>>>>>>>>>>>> just because of the failover. With transactions on the state
> > >>>>>>>> stores
> > >>>>>>>>>> we
> > >>>>>>>>>>>>>> could have avoided this. The app would have output the same
> > >>>>>>>> aggregate
> > >>>>>>>>>>>>>> multiple times (i.e., after each failover) but at least the
> > >>>>>>>> value of
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> aggregate would not depend on the number of failovers.
> > >>>>>>>> Outputting the
> > >>>>>>>>>>>>>> same aggregate multiple times would be incorrect under
> > >>>>>>>> exactly-once
> > >>>>>>>>>> but
> > >>>>>>>>>>>>>> it is OK for at-least-once.
> > >>>>>>>>>>>>>> If it makes sense to add a config to turn on and off
> > >>>>>>>> transactions on
> > >>>>>>>>>>>>>> state stores under at-least-once or just use transactions in
> > >>>>>>>> any case
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> a question we should also discuss in this KIP. It depends a
> > >> bit
> > >>>>>>>> on
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> performance trade-off. Maybe to be safe, I would add a
> > config.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 4.
> > >>>>>>>>>>>>>> Your points are all valid. I tend to say to keep the metrics
> > >>>>>>>> around
> > >>>>>>>>>>>>>> flush() until we remove flush() completely from the
> > interface.
> > >>>>>>>> Calls
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>> flush() might still exist since existing processors might
> > >> still
> > >>>>>>>> call
> > >>>>>>>>>>>>>> flush() explicitly as you mentioned in 1). For sure, we need
> > >> to
> > >>>>>>>>>>>> document
> > >>>>>>>>>>>>>> how the metrics change due to the transactions in the
> > upgrade
> > >>>>>>>> notes.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 5.
> > >>>>>>>>>>>>>> I see. Then you should describe how the .position files are
> > >>>>>>>> handled
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>> a dedicated section of the KIP or incorporate the
> > description
> > >>>>>>>> in the
> > >>>>>>>>>>>>>> "Atomic Checkpointing" section instead of only mentioning it
> > >> in
> > >>>>>>>> the
> > >>>>>>>>>>>>>> "Compatibility, Deprecation, and Migration Plan".
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>> Describing upgrading and downgrading in the KIP is a good
> > >> idea.
> > >>>>>>>>>>>>>> Regarding downgrading, I think you could also catch the
> > >>>>>>>> exception and
> > >>>>>>>>>>>> do
> > >>>>>>>>>>>>>> what is needed to downgrade, e.g., drop the column family.
> > See
> > >>>>>>>> here
> > >>>>>>>>>> for
> > >>>>>>>>>>>>>> an example:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>
> > >>>>
> > >>
> > https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> It is a bit brittle, but it works.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote:
> > >>>>>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for taking the time to review the KIP. I'm back from
> > >>>>>>>> leave
> > >>>>>>>>>> now
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>> intend to move this forwards as quickly as I can.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Addressing your points:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>> Because flush() is part of the StateStore API, it's exposed
> > >> to
> > >>>>>>>>>> custom
> > >>>>>>>>>>>>>>> Processors, which might be making calls to flush(). This
> > was
> > >>>>>>>>>> actually
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> case in a few integration tests.
> > >>>>>>>>>>>>>>> To maintain as much compatibility as possible, I'd prefer
> > not
> > >>>>>>>> to
> > >>>>>>>>>> make
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>> an UnsupportedOperationException, as it will cause
> > previously
> > >>>>>>>>>> working
> > >>>>>>>>>>>>>>> Processors to start throwing exceptions at runtime.
> > >>>>>>>>>>>>>>> I agree that it doesn't make sense for it to proxy
> > commit(),
> > >>>>>>>> though,
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>>> that would cause it to violate the "StateStores commit only
> > >>>>>>>> when the
> > >>>>>>>>>>>> Task
> > >>>>>>>>>>>>>>> commits" rule.
> > >>>>>>>>>>>>>>> Instead, I think we should make this a no-op. That way,
> > >>>>>>>> existing
> > >>>>>>>>>> user
> > >>>>>>>>>>>>>>> Processors will continue to work as-before, without
> > violation
> > >>>>>>>> of
> > >>>>>>>>>> store
> > >>>>>>>>>>>>>>> consistency that would be caused by premature flush/commit
> > of
> > >>>>>>>>>>>> StateStore
> > >>>>>>>>>>>>>>> data to disk.
> > >>>>>>>>>>>>>>> What do you think?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>> As stated in the JavaDoc, when a StateStore implementation
> > is
> > >>>>>>>>>>>>>>> transactional, but is unable to estimate the uncommitted
> > >>>> memory
> > >>>>>>>>>> usage,
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> method will return -1.
> > >>>>>>>>>>>>>>> The intention here is to permit third-party implementations
> > >>>>>>>> that may
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>> able to estimate memory usage.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Yes, it will be 0 when nothing has been written to the
> > store
> > >>>>>>>> yet. I
> > >>>>>>>>>>>>>> thought
> > >>>>>>>>>>>>>>> that was implied by "This method will return an
> > approximation
> > >>>>>>>> of the
> > >>>>>>>>>>>>>> memory
> > >>>>>>>>>>>>>>> would be freed by the next call to {@link #commit(Map)}"
> > and
> > >>>>>>>>>> "@return
> > >>>>>>>>>>>> The
> > >>>>>>>>>>>>>>> approximate size of all records awaiting {@link
> > >>>> #commit(Map)}",
> > >>>>>>>>>>>> however,
> > >>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>> can add it explicitly to the JavaDoc if you think this is
> > >>>>>>>> unclear?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>> I realise this is probably the most contentious point in my
> > >>>>>>>> design,
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>> I'm
> > >>>>>>>>>>>>>>> open to changing it if I'm unable to convince you of the
> > >>>>>>>> benefits.
> > >>>>>>>>>>>>>>> Nevertheless, here's my argument:
> > >>>>>>>>>>>>>>> The Interactive Query (IQ) API(s) are directly provided
> > >>>>>>>> StateStores
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>> query, and it may be important for users to
> > programmatically
> > >>>>>>>> know
> > >>>>>>>>>>>> which
> > >>>>>>>>>>>>>>> mode the StateStore is operating under. If we simply
> > provide
> > >>>> an
> > >>>>>>>>>>>>>>> "eosEnabled" boolean (as used throughout the internal
> > streams
> > >>>>>>>>>>>> engine), or
> > >>>>>>>>>>>>>>> similar, then users will need to understand the operation
> > and
> > >>>>>>>>>>>>>> consequences
> > >>>>>>>>>>>>>>> of each available processing mode and how it pertains to
> > >> their
> > >>>>>>>>>>>>>> StateStore.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Interactive Query users aren't the only people that care
> > >> about
> > >>>>>>>> the
> > >>>>>>>>>>>>>>> processing.mode/IsolationLevel of a StateStore:
> > implementers
> > >>>> of
> > >>>>>>>>>> custom
> > >>>>>>>>>>>>>>> StateStores also need to understand the behaviour expected
> > of
> > >>>>>>>> their
> > >>>>>>>>>>>>>>> implementation. KIP-892 introduces some assumptions into
> > the
> > >>>>>>>> Streams
> > >>>>>>>>>>>>>> Engine
> > >>>>>>>>>>>>>>> about how StateStores operate under each processing mode,
> > and
> > >>>>>>>> it's
> > >>>>>>>>>>>>>>> important that custom implementations adhere to those
> > >>>>>>>> assumptions in
> > >>>>>>>>>>>>>> order
> > >>>>>>>>>>>>>>> to maintain the consistency guarantees.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> IsolationLevels provide a high-level contract on the
> > >> behaviour
> > >>>>>>>> of
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> StateStore: a user knows that under READ_COMMITTED, they
> > will
> > >>>>>>>> see
> > >>>>>>>>>>>> writes
> > >>>>>>>>>>>>>>> only after the Task has committed, and under
> > READ_UNCOMMITTED
> > >>>>>>>> they
> > >>>>>>>>>>>> will
> > >>>>>>>>>>>>>> see
> > >>>>>>>>>>>>>>> writes immediately. No understanding of the details of each
> > >>>>>>>>>>>>>> processing.mode
> > >>>>>>>>>>>>>>> is required, either for IQ users or StateStore
> > implementers.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> An argument can be made that these contractual guarantees
> > can
> > >>>>>>>> simply
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>> documented for the processing.mode (i.e. that exactly-once
> > >> and
> > >>>>>>>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and
> > at-least-once
> > >>>>>>>> behaves
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>>>> READ_UNCOMMITTED), but there are several small issues with
> > >>>>>>>> this I'd
> > >>>>>>>>>>>>>> prefer
> > >>>>>>>>>>>>>>> to avoid:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>         - Where would we document these contracts, in a way
> > >> that
> > >>>>>>>> is
> > >>>>>>>>>>>> difficult
> > >>>>>>>>>>>>>>>         for users/implementers to miss/ignore?
> > >>>>>>>>>>>>>>>         - It's not clear to users that the processing mode
> > is
> > >>>>>>>>>>>> communicating
> > >>>>>>>>>>>>>>>         an expectation of read isolation, unless they read
> > the
> > >>>>>>>>>>>>>> documentation. Users
> > >>>>>>>>>>>>>>>         rarely consult documentation unless they feel they
> > >> need
> > >>>>>>>> to, so
> > >>>>>>>>>>>> it's
> > >>>>>>>>>>>>>> likely
> > >>>>>>>>>>>>>>>         this detail would get missed by many users.
> > >>>>>>>>>>>>>>>         - It tightly couples processing modes to read
> > >> isolation.
> > >>>>>>>> Adding
> > >>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>         processing modes, or changing the read isolation of
> > >>>>>>>> existing
> > >>>>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>         modes would be difficult/impossible.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Ultimately, the cost of introducing IsolationLevels is
> > just a
> > >>>>>>>> single
> > >>>>>>>>>>>>>>> method, since we re-use the existing IsolationLevel enum
> > from
> > >>>>>>>> Kafka.
> > >>>>>>>>>>>> This
> > >>>>>>>>>>>>>>> gives us a clear place to document the contractual
> > guarantees
> > >>>>>>>>>> expected
> > >>>>>>>>>>>>>>> of/provided by StateStores, that is accessible both by the
> > >>>>>>>>>> StateStore
> > >>>>>>>>>>>>>>> itself, and by IQ users.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> (Writing this I've just realised that the StateStore and IQ
> > >>>>>>>> APIs
> > >>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>> don't provide access to StateStoreContext that IQ users
> > would
> > >>>>>>>> have
> > >>>>>>>>>>>> direct
> > >>>>>>>>>>>>>>> access to... Perhaps StateStore should expose
> > >> isolationLevel()
> > >>>>>>>>>> itself
> > >>>>>>>>>>>>>> too?)
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 4.
> > >>>>>>>>>>>>>>> Yeah, I'm not comfortable renaming the metrics in-place
> > >>>>>>>> either, as
> > >>>>>>>>>>>> it's a
> > >>>>>>>>>>>>>>> backwards incompatible change. My concern is that, if we
> > >> leave
> > >>>>>>>> the
> > >>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>> "flush" metrics in place, they will be confusing to users.
> > >>>>>>>> Right
> > >>>>>>>>>> now,
> > >>>>>>>>>>>>>>> "flush" metrics record explicit flushes to disk, but under
> > >>>>>>>> KIP-892,
> > >>>>>>>>>>>> even
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>> commit() will not explicitly flush data to disk - RocksDB
> > >> will
> > >>>>>>>>>> decide
> > >>>>>>>>>>>> on
> > >>>>>>>>>>>>>>> when to flush memtables to disk itself.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> If we keep the existing "flush" metrics, we'd have two
> > >>>> options,
> > >>>>>>>>>> which
> > >>>>>>>>>>>>>> both
> > >>>>>>>>>>>>>>> seem pretty bad to me:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>         1. Have them record calls to commit(), which would
> > be
> > >>>>>>>>>>>> misleading, as
> > >>>>>>>>>>>>>>>         data is no longer explicitly "flushed" to disk by
> > this
> > >>>>>>>> call.
> > >>>>>>>>>>>>>>>         2. Have them record nothing at all, which is
> > >> equivalent
> > >>>> to
> > >>>>>>>>>>>> removing
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>         metrics, except that users will see the metric
> > still
> > >>>>>>>> exists and
> > >>>>>>>>>>>> so
> > >>>>>>>>>>>>>> assume
> > >>>>>>>>>>>>>>>         that the metric is correct, and that there's a
> > problem
> > >>>>>>>> with
> > >>>>>>>>>> their
> > >>>>>>>>>>>>>> system
> > >>>>>>>>>>>>>>>         when there isn't.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I agree that removing them is also a bad solution, and I'd
> > >>>>>>>> like some
> > >>>>>>>>>>>>>>> guidance on the best path forward here.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 5.
> > >>>>>>>>>>>>>>> Position files are updated on every write to a StateStore.
> > >>>>>>>> Since our
> > >>>>>>>>>>>>>> writes
> > >>>>>>>>>>>>>>> are now buffered until commit(), we can't update the
> > Position
> > >>>>>>>> file
> > >>>>>>>>>>>> until
> > >>>>>>>>>>>>>>> commit() has been called, otherwise it would be
> > inconsistent
> > >>>>>>>> with
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> data
> > >>>>>>>>>>>>>>> in the event of a rollback. Consequently, we need to manage
> > >>>>>>>> these
> > >>>>>>>>>>>> offsets
> > >>>>>>>>>>>>>>> the same way we manage the checkpoint offsets, and ensure
> > >>>>>>>> they're
> > >>>>>>>>>> only
> > >>>>>>>>>>>>>>> written on commit().
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>>> Agreed, although I'm not exactly sure yet what tests to
> > >> write.
> > >>>>>>>> How
> > >>>>>>>>>>>>>> explicit
> > >>>>>>>>>>>>>>> do we need to be here in the KIP?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> As for upgrade/downgrade: upgrade is designed to be
> > seamless,
> > >>>>>>>> and we
> > >>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>> definitely add some tests around that. Downgrade, it
> > >>>>>>>> transpires,
> > >>>>>>>>>> isn't
> > >>>>>>>>>>>>>>> currently possible, as the extra column family for offset
> > >>>>>>>> storage is
> > >>>>>>>>>>>>>>> incompatible with the pre-KIP-892 implementation: when you
> > >>>>>>>> open a
> > >>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>>> database, you must open all available column families or
> > >>>>>>>> receive an
> > >>>>>>>>>>>>>> error.
> > >>>>>>>>>>>>>>> What currently happens on downgrade is that it attempts to
> > >>>>>>>> open the
> > >>>>>>>>>>>>>> store,
> > >>>>>>>>>>>>>>> throws an error about the offsets column family not being
> > >>>>>>>> opened,
> > >>>>>>>>>>>> which
> > >>>>>>>>>>>>>>> triggers a wipe and rebuild of the Task. Given that
> > >> downgrades
> > >>>>>>>>>> should
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>> uncommon, I think this is acceptable behaviour, as the
> > >>>>>>>> end-state is
> > >>>>>>>>>>>>>>> consistent, even if it results in an undesirable state
> > >>>> restore.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Should I document the upgrade/downgrade behaviour
> > explicitly
> > >>>>>>>> in the
> > >>>>>>>>>>>> KIP?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
> > >>>>>>>> cado...@apache.org>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Nick!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the updates!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>> Why does StateStore#flush() default to
> > >>>>>>>>>>>>>>>> StateStore#commit(Collections.emptyMap())?
> > >>>>>>>>>>>>>>>> Since calls to flush() will not exist anymore after this
> > KIP
> > >>>>>>>> is
> > >>>>>>>>>>>>>>>> released, I would rather throw an unsupported operation
> > >>>>>>>> exception
> > >>>>>>>>>> by
> > >>>>>>>>>>>>>>>> default.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>> When would a state store return -1 from
> > >>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() while being
> > >>>>>>>>>>>> transactional?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Wouldn't StateStore#approximateNumUncommittedBytes() also
> > >>>>>>>> return 0
> > >>>>>>>>>> if
> > >>>>>>>>>>>>>>>> the state store is transactional but nothing has been
> > >> written
> > >>>>>>>> to
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>> state store yet?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>> Sorry for bringing this up again. Does this KIP really
> > need
> > >>>> to
> > >>>>>>>>>>>> introduce
> > >>>>>>>>>>>>>>>> StateStoreContext#isolationLevel()? StateStoreContext has
> > >>>>>>>> already
> > >>>>>>>>>>>>>>>> appConfigs() which basically exposes the same information,
> > >>>>>>>> i.e., if
> > >>>>>>>>>>>> EOS
> > >>>>>>>>>>>>>>>> is enabled or not.
> > >>>>>>>>>>>>>>>> In one of your previous e-mails you wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> "My idea was to try to keep the StateStore interface as
> > >>>>>>>> loosely
> > >>>>>>>>>>>> coupled
> > >>>>>>>>>>>>>>>> from the Streams engine as possible, to give implementers
> > >>>> more
> > >>>>>>>>>>>> freedom,
> > >>>>>>>>>>>>>>>> and reduce the amount of internal knowledge required."
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> While I understand the intent, I doubt that it decreases
> > the
> > >>>>>>>>>>>> coupling of
> > >>>>>>>>>>>>>>>> a StateStore interface and the Streams engine.
> > >> READ_COMMITTED
> > >>>>>>>> only
> > >>>>>>>>>>>>>>>> applies to IQ but not to reads by processors. Thus,
> > >>>>>>>> implementers
> > >>>>>>>>>>>> need to
> > >>>>>>>>>>>>>>>> understand how Streams accesses the state stores.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I would like to hear what others think about this.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 4.
> > >>>>>>>>>>>>>>>> Great exposing new metrics for transactional state stores!
> > >>>>>>>>>> However, I
> > >>>>>>>>>>>>>>>> would prefer to add new metrics and deprecate (in the
> > docs)
> > >>>>>>>> the old
> > >>>>>>>>>>>>>>>> ones. You can find examples of deprecated metrics here:
> > >>>>>>>>>>>>>>>>
> > https://kafka.apache.org/documentation/#selector_monitoring
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 5.
> > >>>>>>>>>>>>>>>> Why does the KIP mention position files? I do not think
> > they
> > >>>>>>>> are
> > >>>>>>>>>>>> related
> > >>>>>>>>>>>>>>>> to transactions or flushes.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>>>> I think we will also need to adapt/add integration tests
> > >>>>>>>> besides
> > >>>>>>>>>> unit
> > >>>>>>>>>>>>>>>> tests. Additionally, we probably need integration or
> > system
> > >>>>>>>> tests
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>> verify that upgrades and downgrades between transactional
> > >> and
> > >>>>>>>>>>>>>>>> non-transactional state stores work as expected.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>> One more thing: I noted John's suggestion in the KIP,
> > under
> > >>>>>>>>>>>> "Rejected
> > >>>>>>>>>>>>>>>>> Alternatives". I still think it's an idea worth pursuing,
> > >>>>>>>> but I
> > >>>>>>>>>>>> believe
> > >>>>>>>>>>>>>>>>> that it's out of the scope of this KIP, because it
> > solves a
> > >>>>>>>>>>>> different
> > >>>>>>>>>>>>>> set
> > >>>>>>>>>>>>>>>>> of problems to this KIP, and the scope of this one has
> > >>>>>>>> already
> > >>>>>>>>>> grown
> > >>>>>>>>>>>>>>>> quite
> > >>>>>>>>>>>>>>>>> large!
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <
> > >>>>>>>>>> nick.telf...@gmail.com>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I've updated the KIP (
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > >>>>>>>>>>>>>>>> )
> > >>>>>>>>>>>>>>>>>> with the latest changes; mostly bringing back "Atomic
> > >>>>>>>>>>>> Checkpointing"
> > >>>>>>>>>>>>>>>> (for
> > >>>>>>>>>>>>>>>>>> what feels like the 10th time!). I think the one thing
> > >>>>>>>> missing is
> > >>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>> changes to metrics (notably the store "flush" metrics
> > will
> > >>>>>>>> need
> > >>>>>>>>>> to
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> renamed to "commit").
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> The reason I brought back Atomic Checkpointing was to
> > >>>>>>>> decouple
> > >>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>> flush
> > >>>>>>>>>>>>>>>>>> from store commit. This is important, because with
> > >>>>>>>> Transactional
> > >>>>>>>>>>>>>>>>>> StateStores, we now need to call "flush" on *every* Task
> > >>>>>>>> commit,
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>> just when the StateStore is closing, otherwise our
> > >>>>>>>> transaction
> > >>>>>>>>>>>> buffer
> > >>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>> never be written and persisted, instead growing
> > unbounded!
> > >>>> I
> > >>>>>>>>>>>>>>>> experimented
> > >>>>>>>>>>>>>>>>>> with some simple solutions, like forcing a store flush
> > >>>>>>>> whenever
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> transaction buffer was likely to exceed its configured
> > >>>>>>>> size, but
> > >>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>> brittle: it prevented the transaction buffer from being
> > >>>>>>>>>> configured
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> unbounded, and it still would have required explicit
> > >>>>>>>> flushes of
> > >>>>>>>>>>>>>> RocksDB,
> > >>>>>>>>>>>>>>>>>> yielding sub-optimal performance and memory utilization.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I deemed Atomic Checkpointing to be the "right" way to
> > >>>>>>>> resolve
> > >>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>> problem. By ensuring that the changelog offsets that
> > >>>>>>>> correspond
> > >>>>>>>>>> to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> most
> > >>>>>>>>>>>>>>>>>> recently written records are always atomically written
> > to
> > >>>>>>>> the
> > >>>>>>>>>>>>>> StateStore
> > >>>>>>>>>>>>>>>>>> (by writing them to the same transaction buffer), we can
> > >>>>>>>> avoid
> > >>>>>>>>>>>>>> forcibly
> > >>>>>>>>>>>>>>>>>> flushing the RocksDB memtables to disk, letting RocksDB
> > >>>>>>>> flush
> > >>>>>>>>>> them
> > >>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>> when necessary, without losing any of our consistency
> > >>>>>>>> guarantees.
> > >>>>>>>>>>>> See
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> updated KIP for more info.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I have fully implemented these changes, although I'm
> > still
> > >>>>>>>> not
> > >>>>>>>>>>>>>> entirely
> > >>>>>>>>>>>>>>>>>> happy with the implementation for segmented StateStores,
> > >> so
> > >>>>>>>> I
> > >>>>>>>>>> plan
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> refactor that. Despite that, all tests pass. If you'd
> > like
> > >>>>>>>> to try
> > >>>>>>>>>>>> out
> > >>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>> review this highly experimental and incomplete branch,
> > >> it's
> > >>>>>>>>>>>> available
> > >>>>>>>>>>>>>>>> here:
> > >>>>>>>>>>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0
> > .
> > >>>>>>>> Note:
> > >>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>> built
> > >>>>>>>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable base to build
> > >>>>>>>> and test
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>> on,
> > >>>>>>>>>>>>>>>>>> and to enable easy apples-to-apples comparisons in a
> > live
> > >>>>>>>>>>>>>> environment. I
> > >>>>>>>>>>>>>>>>>> plan to rebase it against trunk once it's nearer
> > >> completion
> > >>>>>>>> and
> > >>>>>>>>>> has
> > >>>>>>>>>>>>>> been
> > >>>>>>>>>>>>>>>>>> proven on our main application.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I would really appreciate help in reviewing and testing:
> > >>>>>>>>>>>>>>>>>> - Segmented (Versioned, Session and Window) stores
> > >>>>>>>>>>>>>>>>>> - Global stores
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> As I do not currently use either of these, so my primary
> > >>>>>>>> test
> > >>>>>>>>>>>>>>>> environment
> > >>>>>>>>>>>>>>>>>> doesn't test these areas.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I'm going on Parental Leave starting next week for a few
> > >>>>>>>> weeks,
> > >>>>>>>>>> so
> > >>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>> not have time to move this forward until late August.
> > That
> > >>>>>>>> said,
> > >>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>>> feedback is welcome and appreciated, I just won't be
> > able
> > >>>> to
> > >>>>>>>>>>>> respond
> > >>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>> quickly as usual.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <
> > >>>>>>>>>> nick.telf...@gmail.com>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi Bruno
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Yes, that's correct, although the impact on IQ is not
> > >>>>>>>> something
> > >>>>>>>>>> I
> > >>>>>>>>>>>> had
> > >>>>>>>>>>>>>>>>>>> considered.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> What about atomically updating the state store from the
> > >>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the
> > checkpoint
> > >>>>>>>> (thus,
> > >>>>>>>>>>>>>>>> flushing
> > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data and/or
> > >>>>>>>> number of
> > >>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>> intervals?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I'm not quite sure I follow. Are you suggesting that we
> > >>>>>>>> add an
> > >>>>>>>>>>>>>>>> additional
> > >>>>>>>>>>>>>>>>>>> config for the max number of commit intervals between
> > >>>>>>>>>> checkpoints?
> > >>>>>>>>>>>>>> That
> > >>>>>>>>>>>>>>>>>>> way, we would checkpoint *either* when the transaction
> > >>>>>>>> buffers
> > >>>>>>>>>> are
> > >>>>>>>>>>>>>>>> nearly
> > >>>>>>>>>>>>>>>>>>> full, *OR* whenever a certain number of commit
> > intervals
> > >>>>>>>> have
> > >>>>>>>>>>>>>> elapsed,
> > >>>>>>>>>>>>>>>>>>> whichever comes first?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> That certainly seems reasonable, although this
> > re-ignites
> > >>>>>>>> an
> > >>>>>>>>>>>> earlier
> > >>>>>>>>>>>>>>>>>>> debate about whether a config should be measured in
> > >>>>>>>> "number of
> > >>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>> intervals", instead of just an absolute time.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> FWIW, I realised that this issue is the reason I was
> > >>>>>>>> pursuing
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>> Atomic
> > >>>>>>>>>>>>>>>>>>> Checkpoints, as it de-couples memtable flush from
> > >>>>>>>> checkpointing,
> > >>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>> enables us to just checkpoint on every commit without
> > any
> > >>>>>>>>>>>> performance
> > >>>>>>>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely the "best"
> > >>>>>>>> solution,
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>> I'm not
> > >>>>>>>>>>>>>>>>>>> sure if this is enough to bring it back into this KIP.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I'm currently working on moving all the transactional
> > >>>> logic
> > >>>>>>>>>>>> directly
> > >>>>>>>>>>>>>>>> into
> > >>>>>>>>>>>>>>>>>>> RocksDBStore itself, which does away with the
> > >>>>>>>>>>>>>> StateStore#newTransaction
> > >>>>>>>>>>>>>>>>>>> method, and reduces the number of new classes
> > introduced,
> > >>>>>>>>>>>>>> significantly
> > >>>>>>>>>>>>>>>>>>> reducing the complexity. If it works, and the
> > complexity
> > >>>> is
> > >>>>>>>>>>>>>> drastically
> > >>>>>>>>>>>>>>>>>>> reduced, I may try bringing back Atomic Checkpoints
> > into
> > >>>>>>>> this
> > >>>>>>>>>> KIP.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <
> > >>>>>>>> cado...@apache.org>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks for the insights! Very interesting!
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> As far as I understand, you want to atomically update
> > >> the
> > >>>>>>>> state
> > >>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>> from the transaction buffer, flush the memtable of a
> > >>>> state
> > >>>>>>>>>> store
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> write the checkpoint not after the commit time elapsed
> > >>>> but
> > >>>>>>>>>> after
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> transaction buffer reached a size that would lead to
> > >>>>>>>> exceeding
> > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes before the
> > next
> > >>>>>>>> commit
> > >>>>>>>>>>>>>>>> interval
> > >>>>>>>>>>>>>>>>>>>> ends.
> > >>>>>>>>>>>>>>>>>>>> That means, the Kafka transaction would commit every
> > >>>>>>>> commit
> > >>>>>>>>>>>> interval
> > >>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>> the state store will only be atomically updated
> > roughly
> > >>>>>>>> every
> > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of data. Also
> > IQ
> > >>>>>>>> would
> > >>>>>>>>>>>> then
> > >>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>> see new data roughly every
> > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > >>>>>>>>>>>>>>>>>>>> After a failure the state store needs to restore up to
> > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Is this correct?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> What about atomically updating the state store from
> > the
> > >>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the
> > checkpoint
> > >>>>>>>> (thus,
> > >>>>>>>>>>>>>>>> flushing
> > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data and/or
> > >>>>>>>> number of
> > >>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>> intervals? In such a way, we would have the same delay
> > >>>> for
> > >>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>>> appearing in output topics and IQ because both would
> > >>>>>>>> appear
> > >>>>>>>>>> when
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> Kafka transaction is committed. However, after a
> > failure
> > >>>>>>>> the
> > >>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>> still needs to restore up to
> > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> it might restore data that is already in the state
> > store
> > >>>>>>>>>> because
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> checkpoint lags behind the last stable offset (i.e.
> > the
> > >>>>>>>> last
> > >>>>>>>>>>>>>> committed
> > >>>>>>>>>>>>>>>>>>>> offset) of the changelog topics. Restoring data that
> > is
> > >>>>>>>> already
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> state store is idempotent, so eos should not violated.
> > >>>>>>>>>>>>>>>>>>>> This solution needs at least one new config to specify
> > >>>>>>>> when a
> > >>>>>>>>>>>>>>>> checkpoint
> > >>>>>>>>>>>>>>>>>>>> should be written.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> A small correction to your previous e-mail that does
> > not
> > >>>>>>>> change
> > >>>>>>>>>>>>>>>> anything
> > >>>>>>>>>>>>>>>>>>>> you said: Under alos the default commit interval is 30
> > >>>>>>>> seconds,
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>> five
> > >>>>>>>>>>>>>>>>>>>> seconds.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I've begun performance testing my branch on our
> > staging
> > >>>>>>>>>>>>>> environment,
> > >>>>>>>>>>>>>>>>>>>>> putting it through its paces in our non-trivial
> > >>>>>>>> application.
> > >>>>>>>>>> I'm
> > >>>>>>>>>>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>>>>>>> observing the same increased flush rate that we saw
> > the
> > >>>>>>>> last
> > >>>>>>>>>>>> time
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>> attempted to use a version of this KIP, but this
> > time,
> > >> I
> > >>>>>>>>>> think I
> > >>>>>>>>>>>>>> know
> > >>>>>>>>>>>>>>>>>>>> why.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is called
> > at
> > >>>>>>>> the end
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> Task
> > >>>>>>>>>>>>>>>>>>>>> commit process, has the following behaviour:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>           - Under ALOS: checkpoint the state stores.
> > >> This
> > >>>>>>>>>> includes
> > >>>>>>>>>>>>>>>>>>>>>           flushing memtables in RocksDB. This is
> > >>>> acceptable
> > >>>>>>>>>>>> because the
> > >>>>>>>>>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>>>           commit.interval.ms is 5 seconds, so
> > forcibly
> > >>>>>>>> flushing
> > >>>>>>>>>>>>>> memtables
> > >>>>>>>>>>>>>>>>>>>> every 5
> > >>>>>>>>>>>>>>>>>>>>>           seconds is acceptable for most
> > applications.
> > >>>>>>>>>>>>>>>>>>>>>           - Under EOS: checkpointing is not done,
> > >> *unless*
> > >>>>>>>> it's
> > >>>>>>>>>>>> being
> > >>>>>>>>>>>>>>>>>>>> forced, due
> > >>>>>>>>>>>>>>>>>>>>>           to e.g. the Task closing or being revoked.
> > >> This
> > >>>>>>>> means
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>> under
> > >>>>>>>>>>>>>>>>>>>> normal
> > >>>>>>>>>>>>>>>>>>>>>           processing conditions, the state stores
> > will
> > >> not
> > >>>>>>>> be
> > >>>>>>>>>>>>>>>> checkpointed,
> > >>>>>>>>>>>>>>>>>>>> and will
> > >>>>>>>>>>>>>>>>>>>>>           not have memtables flushed at all , unless
> > >>>> RocksDB
> > >>>>>>>>>>>> decides to
> > >>>>>>>>>>>>>>>>>>>> flush them on
> > >>>>>>>>>>>>>>>>>>>>>           its own. Checkpointing stores and
> > >> force-flushing
> > >>>>>>>> their
> > >>>>>>>>>>>>>> memtables
> > >>>>>>>>>>>>>>>>>>>> is only
> > >>>>>>>>>>>>>>>>>>>>>           done when a Task is being closed.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on at
> > >>>> least
> > >>>>>>>>>> *some*
> > >>>>>>>>>>>>>>>> normal
> > >>>>>>>>>>>>>>>>>>>>> Task commits, in order to write the RocksDB
> > transaction
> > >>>>>>>>>> buffers
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> state stores, and to ensure the offsets are synced to
> > >>>>>>>> disk to
> > >>>>>>>>>>>>>> prevent
> > >>>>>>>>>>>>>>>>>>>>> restores from getting out of hand. Consequently, my
> > >>>>>>>> current
> > >>>>>>>>>>>>>>>>>>>> implementation
> > >>>>>>>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task commit, which
> > is
> > >>>>>>>> far too
> > >>>>>>>>>>>>>>>>>>>> frequent.
> > >>>>>>>>>>>>>>>>>>>>> This causes checkpoints every 10,000 records, which
> > is
> > >> a
> > >>>>>>>>>> change
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> flush
> > >>>>>>>>>>>>>>>>>>>>> behaviour, potentially causing performance problems
> > for
> > >>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>> applications.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I'm looking into possible solutions, and I'm
> > currently
> > >>>>>>>> leaning
> > >>>>>>>>>>>>>>>> towards
> > >>>>>>>>>>>>>>>>>>>>> using the statestore.transaction.buffer.max.bytes
> > >>>>>>>>>> configuration
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to exceed it.
> > This
> > >>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>> complement the
> > >>>>>>>>>>>>>>>>>>>>> existing "early Task commit" functionality that this
> > >>>>>>>>>>>> configuration
> > >>>>>>>>>>>>>>>>>>>>> provides, in the following way:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>           - Currently, we use
> > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> force an
> > >>>>>>>>>>>>>>>>>>>>>           early Task commit if processing more
> > records
> > >>>> would
> > >>>>>>>>>> cause
> > >>>>>>>>>>>> our
> > >>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>           transactions to exceed the memory assigned
> > to
> > >>>>>>>> them.
> > >>>>>>>>>>>>>>>>>>>>>           - New functionality: when a Task *does*
> > >> commit,
> > >>>>>>>> we will
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> checkpoint
> > >>>>>>>>>>>>>>>>>>>>>           the stores (and hence flush the transaction
> > >>>>>>>> buffers)
> > >>>>>>>>>>>> unless
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>> expect to
> > >>>>>>>>>>>>>>>>>>>>>           cross the
> > >>>> statestore.transaction.buffer.max.bytes
> > >>>>>>>>>>>> threshold
> > >>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>>> the next
> > >>>>>>>>>>>>>>>>>>>>>           commit
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I'm also open to suggestions.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
> > >>>>>>>>>>>> nick.telf...@gmail.com
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hi Bruno!
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>> By "less predictable for users", I meant in terms of
> > >>>>>>>>>>>> understanding
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> performance profile under various circumstances. The
> > >>>>>>>> more
> > >>>>>>>>>>>> complex
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> solution, the more difficult it would be for users
> > to
> > >>>>>>>>>>>> understand
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> performance they see. For example, spilling records
> > to
> > >>>>>>>> disk
> > >>>>>>>>>>>> when
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> transaction buffer reaches a threshold would, I
> > >> expect,
> > >>>>>>>>>> reduce
> > >>>>>>>>>>>>>> write
> > >>>>>>>>>>>>>>>>>>>>>> throughput. This reduction in write throughput could
> > >> be
> > >>>>>>>>>>>>>> unexpected,
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> potentially difficult to diagnose/understand for
> > >> users.
> > >>>>>>>>>>>>>>>>>>>>>> At the moment, I think the "early commit" concept is
> > >>>>>>>>>> relatively
> > >>>>>>>>>>>>>>>>>>>>>> straightforward; it's easy to document, and
> > >>>> conceptually
> > >>>>>>>>>> fairly
> > >>>>>>>>>>>>>>>>>>>> obvious to
> > >>>>>>>>>>>>>>>>>>>>>> users. We could probably add a metric to make it
> > >> easier
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>> understand
> > >>>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>> it happens though.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 3. (the second one)
> > >>>>>>>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an indirect way
> > of
> > >>>>>>>>>> telling
> > >>>>>>>>>>>>>>>>>>>> StateStores
> > >>>>>>>>>>>>>>>>>>>>>> whether they should be transactional. READ_COMMITTED
> > >>>>>>>>>>>> essentially
> > >>>>>>>>>>>>>>>>>>>> requires
> > >>>>>>>>>>>>>>>>>>>>>> transactions, because it dictates that two threads
> > >>>>>>>> calling
> > >>>>>>>>>>>>>>>>>>>>>> `newTransaction()` should not see writes from the
> > >> other
> > >>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>> until
> > >>>>>>>>>>>>>>>>>>>>>> they have been committed. With READ_UNCOMMITTED, all
> > >>>>>>>> bets are
> > >>>>>>>>>>>> off,
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> stores can allow threads to observe written records
> > at
> > >>>>>>>> any
> > >>>>>>>>>>>> time,
> > >>>>>>>>>>>>>>>>>>>> which is
> > >>>>>>>>>>>>>>>>>>>>>> essentially "no transactions". That said,
> > StateStores
> > >>>>>>>> are
> > >>>>>>>>>> free
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> implement
> > >>>>>>>>>>>>>>>>>>>>>> these guarantees however they can, which is a bit
> > more
> > >>>>>>>>>> relaxed
> > >>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>> dictating "you must use transactions". For example,
> > >>>> with
> > >>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>> implement these as READ_COMMITTED == WBWI-based
> > >>>>>>>>>> "transactions",
> > >>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the database.
> > But
> > >>>>>>>> with
> > >>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>> storage
> > >>>>>>>>>>>>>>>>>>>>>> engines, it might be preferable to *always* use
> > >>>>>>>> transactions,
> > >>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>> unnecessary; or there may be storage engines that
> > >> don't
> > >>>>>>>>>> provide
> > >>>>>>>>>>>>>>>>>>>>>> transactions, but the isolation guarantees can be
> > met
> > >>>>>>>> using a
> > >>>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>> technique.
> > >>>>>>>>>>>>>>>>>>>>>> My idea was to try to keep the StateStore interface
> > as
> > >>>>>>>>>> loosely
> > >>>>>>>>>>>>>>>> coupled
> > >>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> > >>>>>>>> implementers
> > >>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> freedom, and
> > >>>>>>>>>>>>>>>>>>>>>> reduce the amount of internal knowledge required.
> > >>>>>>>>>>>>>>>>>>>>>> That said, I understand that "IsolationLevel" might
> > >> not
> > >>>>>>>> be
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> right
> > >>>>>>>>>>>>>>>>>>>>>> abstraction, and we can always make it much more
> > >>>>>>>> explicit if
> > >>>>>>>>>>>>>>>>>>>> required, e.g.
> > >>>>>>>>>>>>>>>>>>>>>> boolean transactional()
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 7-8.
> > >>>>>>>>>>>>>>>>>>>>>> I can make these changes either later today or
> > >>>> tomorrow.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Small update:
> > >>>>>>>>>>>>>>>>>>>>>> I've rebased my branch on trunk and fixed a bunch of
> > >>>>>>>> issues
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> needed
> > >>>>>>>>>>>>>>>>>>>>>> addressing. Currently, all the tests pass, which is
> > >>>>>>>>>> promising,
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>> need to undergo some performance testing. I haven't
> > >>>>>>>> (yet)
> > >>>>>>>>>>>> worked
> > >>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff, but I would
> > >>>>>>>> expect
> > >>>>>>>>>> that,
> > >>>>>>>>>>>>>>>>>>>>>> behaviourally, it should make no difference. The
> > >> branch
> > >>>>>>>> is
> > >>>>>>>>>>>>>> available
> > >>>>>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c
> > >> if
> > >>>>>>>>>> anyone
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>> interested in taking an early look.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
> > >>>>>>>>>>>> cado...@apache.org>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was actually also my
> > >>>>>>>> point. I
> > >>>>>>>>>>>>>>>> understood
> > >>>>>>>>>>>>>>>>>>>>>>> that John was proposing the ingestion path as a way
> > >> to
> > >>>>>>>> avoid
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> early
> > >>>>>>>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the intent.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>>>> I agree with John here, that actually it is public
> > >>>>>>>> API. My
> > >>>>>>>>>>>>>> question
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>> how this usage pattern affects normal processing.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>> My concern is that checking for the size of the
> > >>>>>>>> transaction
> > >>>>>>>>>>>>>> buffer
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>> maybe triggering an early commit affects the whole
> > >>>>>>>>>> processing
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>>>>>>>>> Streams. The transactionality of a state store is
> > not
> > >>>>>>>>>>>> confined to
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> state store itself, but spills over and changes the
> > >>>>>>>> behavior
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>>> parts of the system. I agree with you that it is a
> > >>>>>>>> decent
> > >>>>>>>>>>>>>>>>>>>> compromise. I
> > >>>>>>>>>>>>>>>>>>>>>>> just wanted to analyse the downsides and list the
> > >>>>>>>> options to
> > >>>>>>>>>>>>>>>> overcome
> > >>>>>>>>>>>>>>>>>>>>>>> them. I also agree with you that all options seem
> > >>>> quite
> > >>>>>>>>>> heavy
> > >>>>>>>>>>>>>>>>>>>> compared
> > >>>>>>>>>>>>>>>>>>>>>>> with your KIP. I do not understand what you mean
> > with
> > >>>>>>>> "less
> > >>>>>>>>>>>>>>>>>>>> predictable
> > >>>>>>>>>>>>>>>>>>>>>>> for users", though.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I found the discussions about the alternatives
> > really
> > >>>>>>>>>>>>>> interesting.
> > >>>>>>>>>>>>>>>>>>>> But I
> > >>>>>>>>>>>>>>>>>>>>>>> also think that your plan sounds good and we should
> > >>>>>>>> continue
> > >>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>> it!
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Some comments on your reply to my e-mail on June
> > >> 20th:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning behind putting
> > >>>>>>>> isolation
> > >>>>>>>>>>>>>> level
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>> the state store context. Thanks! Should that also
> > be
> > >> a
> > >>>>>>>> way
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>> give
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> the state store the opportunity to decide whether
> > to
> > >>>>>>>> turn on
> > >>>>>>>>>>>>>>>>>>>>>>> transactions or not?
> > >>>>>>>>>>>>>>>>>>>>>>> With my comment, I was more concerned about how do
> > >> you
> > >>>>>>>> know
> > >>>>>>>>>>>> if a
> > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file needs to be written under EOS, if
> > you
> > >>>>>>>> do not
> > >>>>>>>>>>>>>> have a
> > >>>>>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>>>>>>> to know if the state store is transactional or not.
> > >> If
> > >>>>>>>> a
> > >>>>>>>>>> state
> > >>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>> transactional, the checkpoint file can be written
> > >>>>>>>> during
> > >>>>>>>>>>>> normal
> > >>>>>>>>>>>>>>>>>>>>>>> processing under EOS. If the state store is not
> > >>>>>>>>>> transactional,
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file must not be written under EOS.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 7.
> > >>>>>>>>>>>>>>>>>>>>>>> My point was about not only considering the bytes
> > in
> > >>>>>>>> memory
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>>>> config
> > >>>>>>>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but also bytes
> > that
> > >>>>>>>> might
> > >>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> spilled
> > >>>>>>>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering whether you
> > >> should
> > >>>>>>>>>> remove
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory bytes to be
> > >> used
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> buffer uncommitted state-store records." My
> > thinking
> > >>>>>>>> was
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>> if a
> > >>>>>>>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to disk,
> > >> limiting
> > >>>>>>>> the
> > >>>>>>>>>>>>>> overall
> > >>>>>>>>>>>>>>>>>>>> bytes
> > >>>>>>>>>>>>>>>>>>>>>>> might make sense. Thinking about it again and
> > >>>>>>>> considering
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>> recent
> > >>>>>>>>>>>>>>>>>>>>>>> discussions, it does not make too much sense
> > anymore.
> > >>>>>>>>>>>>>>>>>>>>>>> I like the name
> > >>>>>>>> statestore.transaction.buffer.max.bytes that
> > >>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>> proposed.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 8.
> > >>>>>>>>>>>>>>>>>>>>>>> A high-level description (without implementation
> > >>>>>>>> details) of
> > >>>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>>>>>>>>> Streams will manage the commit of changelog
> > >>>>>>>> transactions,
> > >>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>> transactions and checkpointing would be great.
> > Would
> > >>>> be
> > >>>>>>>>>> great
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>> could also add some sentences about the behavior in
> > >>>>>>>> case of
> > >>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>> failure.
> > >>>>>>>>>>>>>>>>>>>>>>> For instance how does a transactional state store
> > >>>>>>>> recover
> > >>>>>>>>>>>> after a
> > >>>>>>>>>>>>>>>>>>>>>>> failure or what happens with the transaction
> > buffer,
> > >>>>>>>> etc.
> > >>>>>>>>>>>> (that
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.)
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that
> > >>>>>>>> WriteBatchWithIndex
> > >>>>>>>>>>>>>>>>>>>> transactions
> > >>>>>>>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to exceed)
> > >>>>>>>> configured
> > >>>>>>>>>>>> memory
> > >>>>>>>>>>>>>>>>>>>> needs to
> > >>>>>>>>>>>>>>>>>>>>>>>> trigger an early commit?
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>>>>> This is one of my big concerns. Ultimately, any
> > >>>>>>>> approach
> > >>>>>>>>>>>> based
> > >>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>> cracking
> > >>>>>>>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in ways it's
> > not
> > >>>>>>>> really
> > >>>>>>>>>>>>>>>> designed
> > >>>>>>>>>>>>>>>>>>>>>>> for is
> > >>>>>>>>>>>>>>>>>>>>>>>> likely to have some unforseen performance or
> > >>>>>>>> consistency
> > >>>>>>>>>>>> issues.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>>> What's your motivation for removing these early
> > >>>>>>>> commits?
> > >>>>>>>>>>>> While
> > >>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>> ideal, I
> > >>>>>>>>>>>>>>>>>>>>>>>> think they're a decent compromise to ensure
> > >>>>>>>> consistency
> > >>>>>>>>>>>> whilst
> > >>>>>>>>>>>>>>>>>>>>>>> maintaining
> > >>>>>>>>>>>>>>>>>>>>>>>> good and predictable performance.
> > >>>>>>>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem *very*
> > >>>>>>>> complicated, and
> > >>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>>>>> make behaviour less predictable for users as a
> > >>>>>>>> consequence.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of this KIP is
> > >>>>>>>> growing a
> > >>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>> out
> > >>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> control. While it's good to discuss ideas for
> > future
> > >>>>>>>>>>>>>>>> improvements, I
> > >>>>>>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>> it's important to narrow the scope down to a
> > design
> > >>>>>>>> that
> > >>>>>>>>>>>>>> achieves
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> most
> > >>>>>>>>>>>>>>>>>>>>>>>> pressing objectives (constant sized restorations
> > >>>>>>>> during
> > >>>>>>>>>> dirty
> > >>>>>>>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design that this KIP
> > >>>>>>>> produces
> > >>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>> ultimately
> > >>>>>>>>>>>>>>>>>>>>>>>> be changed in the future, especially if the bulk
> > of
> > >>>>>>>> it is
> > >>>>>>>>>>>>>> internal
> > >>>>>>>>>>>>>>>>>>>>>>>> behaviour.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I'm going to spend some time next week trying to
> > >>>>>>>> re-work
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove the
> > >>>>>>>> newTransaction()
> > >>>>>>>>>>>>>> method,
> > >>>>>>>>>>>>>>>>>>>> such
> > >>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>> it's just an implementation detail of
> > RocksDBStore.
> > >>>>>>>> That
> > >>>>>>>>>>>> way, if
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>> want to
> > >>>>>>>>>>>>>>>>>>>>>>>> replace WBWI with something in the future, like
> > the
> > >>>>>>>> SST
> > >>>>>>>>>> file
> > >>>>>>>>>>>>>>>>>>>> management
> > >>>>>>>>>>>>>>>>>>>>>>>> outlined by John, then we can do so with little/no
> > >>>> API
> > >>>>>>>>>>>> changes.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >

Reply via email to