I'd agree with you guys that as long as we are in agreement about the
configuration semantics, that would be a big win to move forward for
this KIP. As for the TaskCorruptedException handling like wiping state
stores, we can discuss that in the PR rather than in the KIP.

Just to clarify, I'm onboard with the latest proposal, and probably we
can move on for voting on this KIP now?

Guozhang

On Thu, Oct 19, 2023 at 5:33 AM Bruno Cadonna <cado...@apache.org> wrote:
>
> Hi Nick,
>
> What you and Lucas wrote about the different configurations of ALOS/EOS
> and READ_COMMITTED/READ_UNCOMMITTED make sense to me. My earlier
> concerns about changelogs diverging from the content of the local state
> stores turned out to not apply. So I think, we can move on with those
> configurations.
>
> Regarding the TaskCorruptedException and wiping out the state stores
> under EOS, couldn't we abort the transaction on the state store and
> close the task dirty? If the Kafka transaction was indeed committed, the
> store would restore the missing part from the changelog topic. If the
> Kafka transaction was not committed, changelog topic and state store are
> in-sync.
>
> In any case, IMO those are implementation details that we do not need to
> discuss and solve in the KIP discussion. We can solve them on the PR.
> The important thing is that the processing guarantees hold.
>
> Best,
> Bruno
>
> On 10/18/23 3:56 PM, Nick Telford wrote:
> > Hi Lucas,
> >
> > TaskCorruptedException is how Streams signals that the Task state needs to
> > be wiped, so we can't retain that exception without also wiping state on
> > timeouts.
> >
> > Regards,
> > Nick
> >
> > On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy 
> > <lbruts...@confluent.io.invalid>
> > wrote:
> >
> >> Hi Nick,
> >>
> >> I think indeed the better behavior would be to retry commitTransaction
> >> until we risk running out of time to meet `max.poll.interval.ms`.
> >>
> >> However, if it's handled as a `TaskCorruptedException` at the moment,
> >> I would do the same in this KIP, and leave exception handling
> >> improvements to future work. This KIP is already improving the
> >> situation a lot by not wiping the state store.
> >>
> >> Cheers,
> >> Lucas
> >>
> >> On Tue, Oct 17, 2023 at 3:51 PM Nick Telford <nick.telf...@gmail.com>
> >> wrote:
> >>>
> >>> Hi Lucas,
> >>>
> >>> Yeah, this is pretty much the direction I'm thinking of going in now. You
> >>> make an interesting point about committing on-error under
> >>> ALOS/READ_COMMITTED, although I haven't had a chance to think through the
> >>> implications yet.
> >>>
> >>> Something that I ran into earlier this week is an issue with the new
> >>> handling of TimeoutException. Without TX stores, TimeoutException under
> >> EOS
> >>> throws a TaskCorruptedException, which wipes the stores. However, with TX
> >>> stores, TimeoutException is now just bubbled up and dealt with as it is
> >>> under ALOS. The problem arises when the Producer#commitTransaction call
> >>> times out: Streams attempts to ignore the error and continue producing,
> >>> which causes the next call to Producer#send to throw
> >>> "IllegalStateException: Cannot attempt operation `send` because the
> >>> previous call to `commitTransaction` timed out and must be retried".
> >>>
> >>> I'm not sure what we should do here: retrying the commitTransaction seems
> >>> logical, but what if it times out again? Where do we draw the line and
> >>> shutdown the instance?
> >>>
> >>> Regards,
> >>> Nick
> >>>
> >>> On Mon, 16 Oct 2023 at 13:19, Lucas Brutschy <lbruts...@confluent.io
> >> .invalid>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED,
> >>>> but keep wiping the state on error, and I'd vote for this solution
> >>>> when introducing `default.state.isolation.level`. This way, we'd have
> >>>> the most low-risk roll-out of this feature (no behavior change without
> >>>> reconfiguration), with the possibility of switching to the most sane /
> >>>> battle-tested default settings in 4.0. Essentially, we'd have a
> >>>> feature flag but call it `default.state.isolation.level` and don't
> >>>> have to deprecate it later.
> >>>>
> >>>> So the possible configurations would then be this:
> >>>>
> >>>> 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
> >>>> reads from DB.
> >>>> 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> >>>> WriteBatch/DB. Flush on error (see note below).
> >>>> 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
> >>>> reads from DB. Wipe state on error.
> >>>> 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> >>>> WriteBatch/DB.
> >>>>
> >>>> I believe the feature is important enough that we will see good
> >>>> adoption even without changing the default. In 4.0, when we have seen
> >>>> this being adopted and is battle-tested, we make READ_COMMITTED the
> >>>> default for EOS, or even READ_COMITTED always the default, depending
> >>>> on our experiences. And we could add a clever implementation of
> >>>> READ_UNCOMITTED with WriteBatches later.
> >>>>
> >>>> The only smell here is that `default.state.isolation.level` wouldn't
> >>>> be purely an IQ setting, but it would also (slightly) change the
> >>>> behavior of the processing, but that seems unavoidable as long as we
> >>>> haven't solve READ_UNCOMITTED IQ with WriteBatches.
> >>>>
> >>>> Minor: As for Bruno's point 4, I think if we are concerned about this
> >>>> behavior (we don't necessarily have to be, because it doesn't violate
> >>>> ALOS guarantees as far as I can see), we could make
> >>>> ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing
> >>>> the WriteBatch on error (obviously, only if we have a chance to do
> >>>> that).
> >>>>
> >>>> Cheers,
> >>>> Lucas
> >>>>
> >>>> On Mon, Oct 16, 2023 at 12:19 PM Nick Telford <nick.telf...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi Guozhang,
> >>>>>
> >>>>> The KIP as it stands introduces a new configuration,
> >>>>> default.state.isolation.level, which is independent of
> >> processing.mode.
> >>>>> It's intended that this new configuration be used to configure a
> >> global
> >>>> IQ
> >>>>> isolation level in the short term, with a future KIP introducing the
> >>>>> capability to change the isolation level on a per-query basis,
> >> falling
> >>>> back
> >>>>> to the "default" defined by this config. That's why I called it
> >>>> "default",
> >>>>> for future-proofing.
> >>>>>
> >>>>> However, it currently includes the caveat that READ_UNCOMMITTED is
> >> not
> >>>>> available under EOS. I think this is the coupling you are alluding
> >> to?
> >>>>>
> >>>>> This isn't intended to be a restriction of the API, but is currently
> >> a
> >>>>> technical limitation. However, after discussing with some users about
> >>>>> use-cases that would require READ_UNCOMMITTED under EOS, I'm
> >> inclined to
> >>>>> remove that clause and put in the necessary work to make that
> >> combination
> >>>>> possible now.
> >>>>>
> >>>>> I currently see two possible approaches:
> >>>>>
> >>>>>     1. Disable TX StateStores internally when the IsolationLevel is
> >>>>>     READ_UNCOMMITTED and the processing.mode is EOS. This is more
> >>>> difficult
> >>>>>     than it sounds, as there are many assumptions being made
> >> throughout
> >>>> the
> >>>>>     internals about the guarantees StateStores provide. It would
> >>>> definitely add
> >>>>>     a lot of extra "if (read_uncommitted && eos)" branches,
> >> complicating
> >>>>>     maintenance and testing.
> >>>>>     2. Invest the time *now* to make READ_UNCOMMITTED of EOS
> >> StateStores
> >>>>>     possible. I have some ideas on how this could be achieved, but
> >> they
> >>>> would
> >>>>>     need testing and could introduce some additional issues. The
> >> benefit
> >>>> of
> >>>>>     this approach is that it would make query-time IsolationLevels
> >> much
> >>>> simpler
> >>>>>     to implement in the future.
> >>>>>
> >>>>> Unfortunately, both will require considerable work that will further
> >>>> delay
> >>>>> this KIP, which was the reason I placed the restriction in the KIP
> >> in the
> >>>>> first place.
> >>>>>
> >>>>> Regards,
> >>>>> Nick
> >>>>>
> >>>>> On Sat, 14 Oct 2023 at 03:30, Guozhang Wang <
> >> guozhang.wang...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hello Nick,
> >>>>>>
> >>>>>> First of all, thanks a lot for the great effort you've put in
> >> driving
> >>>>>> this KIP! I really like it coming through finally, as many people
> >> in
> >>>>>> the community have raised this. At the same time I honestly feel a
> >> bit
> >>>>>> ashamed for not putting enough of my time supporting it and
> >> pushing it
> >>>>>> through the finish line (you raised this KIP almost a year ago).
> >>>>>>
> >>>>>> I briefly passed through the DISCUSS thread so far, not sure I've
> >> 100
> >>>>>> percent digested all the bullet points. But with the goal of
> >> trying to
> >>>>>> help take it through the finish line in mind, I'd want to throw
> >>>>>> thoughts on top of my head only on the point #4 above which I felt
> >> may
> >>>>>> be the main hurdle for the current KIP to drive to a consensus now.
> >>>>>>
> >>>>>> The general question I asked myself is, whether we want to couple
> >> "IQ
> >>>>>> reading mode" with "processing mode". While technically I tend to
> >>>>>> agree with you that, it's feels like a bug if some single user
> >> chose
> >>>>>> "EOS" for processing mode while choosing "read uncommitted" for IQ
> >>>>>> reading mode, at the same time, I'm thinking if it's possible that
> >>>>>> there could be two different persons (or even two teams) that
> >> would be
> >>>>>> using the stream API to build the app, and the IQ API to query the
> >>>>>> running state of the app. I know this is less of a technical thing
> >> but
> >>>>>> rather a more design stuff, but if it could be ever the case, I'm
> >>>>>> wondering if the personale using the IQ API knows about the risks
> >> of
> >>>>>> using read uncommitted but still chose so for the favor of
> >>>>>> performance, no matter if the underlying stream processing mode
> >>>>>> configured by another personale is EOS or not. In that regard, I'm
> >>>>>> leaning towards a "leaving the door open, and close it later if we
> >>>>>> found it's a bad idea" aspect with a configuration that we can
> >>>>>> potentially deprecate than "shut the door, clean for everyone".
> >> More
> >>>>>> specifically, allowing the processing mode / IQ read mode to be
> >>>>>> decoupled, and if we found that there's no such cases as I
> >> speculated
> >>>>>> above or people started complaining a lot, we can still enforce
> >>>>>> coupling them.
> >>>>>>
> >>>>>> Again, just my 2c here. Thanks again for the great patience and
> >>>>>> diligence on this KIP.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <
> >> nick.telf...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi Bruno,
> >>>>>>>
> >>>>>>> 4.
> >>>>>>> I'll hold off on making that change until we have a consensus as
> >> to
> >>>> what
> >>>>>>> configuration to use to control all of this, as it'll be
> >> affected by
> >>>> the
> >>>>>>> decision on EOS isolation levels.
> >>>>>>>
> >>>>>>> 5.
> >>>>>>> Done. I've chosen "committedOffsets".
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Nick
> >>>>>>>
> >>>>>>> On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Nick,
> >>>>>>>>
> >>>>>>>> 1.
> >>>>>>>> Yeah, you are probably right that it does not make too much
> >> sense.
> >>>>>>>> Thanks for the clarification!
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 4.
> >>>>>>>> Yes, sorry for the back and forth, but I think for the sake of
> >> the
> >>>> KIP
> >>>>>>>> it is better to let the ALOS behavior as it is for now due to
> >> the
> >>>>>>>> possible issues you would run into. Maybe we can find a
> >> solution
> >>>> in the
> >>>>>>>> future. Now the question returns to whether we really need
> >>>>>>>> default.state.isolation.level. Maybe the config could be the
> >>>> feature
> >>>>>>>> flag Sophie requested.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 5.
> >>>>>>>> There is a guideline in Kafka not to use the get prefix for
> >>>> getters (at
> >>>>>>>> least in the public API). Thus, could you please rename
> >>>>>>>>
> >>>>>>>> getCommittedOffset(TopicPartition partition) ->
> >>>>>>>> committedOffsetFor(TopicPartition partition)
> >>>>>>>>
> >>>>>>>> You can also propose an alternative to committedOffsetFor().
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 10/13/23 3:21 PM, Nick Telford wrote:
> >>>>>>>>> Hi Bruno,
> >>>>>>>>>
> >>>>>>>>> Thanks for getting back to me.
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>> I think this should be possible. Are you thinking of the
> >>>> situation
> >>>>>> where
> >>>>>>>> a
> >>>>>>>>> user may downgrade to a previous version of Kafka Streams? In
> >>>> that
> >>>>>> case,
> >>>>>>>>> sadly, the RocksDBStore would get wiped by the older version
> >> of
> >>>> Kafka
> >>>>>>>>> Streams anyway, because that version wouldn't understand the
> >>>> extra
> >>>>>> column
> >>>>>>>>> family (that holds offsets), so the missing Position file
> >> would
> >>>>>>>>> automatically get rebuilt when the store is rebuilt from the
> >>>>>> changelog.
> >>>>>>>>> Are there other situations than downgrade where a
> >> transactional
> >>>> store
> >>>>>>>> could
> >>>>>>>>> be replaced by a non-transactional one? I can't think of any.
> >>>>>>>>>
> >>>>>>>>> 2.
> >>>>>>>>> Ahh yes, the Test Plan - my Kryptonite! This section
> >> definitely
> >>>>>> needs to
> >>>>>>>> be
> >>>>>>>>> fleshed out. I'll work on that. How much detail do you need?
> >>>>>>>>>
> >>>>>>>>> 3.
> >>>>>>>>> See my previous email discussing this.
> >>>>>>>>>
> >>>>>>>>> 4.
> >>>>>>>>> Hmm, this is an interesting point. Are you suggesting that
> >> under
> >>>> ALOS
> >>>>>>>>> READ_COMMITTED should not be supported?
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Nick
> >>>>>>>>>
> >>>>>>>>> On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <
> >> cado...@apache.org>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Nick,
> >>>>>>>>>>
> >>>>>>>>>> I think the KIP is converging!
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 1.
> >>>>>>>>>> I am wondering whether it makes sense to write the position
> >> file
> >>>>>> during
> >>>>>>>>>> close as we do for the checkpoint file, so that in case the
> >>>> state
> >>>>>> store
> >>>>>>>>>> is replaced with a non-transactional state store the
> >>>>>> non-transactional
> >>>>>>>>>> state store finds the position file. I think, this is not
> >>>> strictly
> >>>>>>>>>> needed, but would be a nice behavior instead of just
> >> deleting
> >>>> the
> >>>>>>>>>> position file.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 2.
> >>>>>>>>>> The test plan does not mention integration tests. Do you not
> >>>> need to
> >>>>>>>>>> extend existing ones and add new ones. Also for upgrading
> >> and
> >>>>>>>>>> downgrading you might need integration and/or system tests.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 3.
> >>>>>>>>>> I think Sophie made a point. Although, IQ reading from
> >>>> uncommitted
> >>>>>> data
> >>>>>>>>>> under EOS might be considered a bug by some people. Thus,
> >> your
> >>>> KIP
> >>>>>> would
> >>>>>>>>>> fix a bug rather than changing the intended behavior.
> >> However, I
> >>>>>> also
> >>>>>>>>>> see that a feature flag would help users that rely on this
> >> buggy
> >>>>>>>>>> behavior (at least until AK 4.0).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 4.
> >>>>>>>>>> This is related to the previous point. I assume that the
> >>>> difference
> >>>>>>>>>> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is
> >> that in
> >>>> the
> >>>>>>>>>> former you enable transactions on the state store and in the
> >>>> latter
> >>>>>> you
> >>>>>>>>>> disable them. If my assumption is correct, I think that is
> >> an
> >>>> issue.
> >>>>>>>>>> Let's assume under ALOS Streams fails over a couple of times
> >>>> more or
> >>>>>>>>>> less at the same step in processing after value 3 is added
> >> to an
> >>>>>>>>>> aggregation but the offset of the corresponding input record
> >>>> was not
> >>>>>>>>>> committed. Without transactions disabled, the aggregation
> >> value
> >>>>>> would
> >>>>>>>>>> increase by 3 for each failover. With transactions enabled,
> >>>> value 3
> >>>>>>>>>> would only be added to the aggregation once when the offset
> >> of
> >>>> the
> >>>>>> input
> >>>>>>>>>> record is committed and the transaction finally completes.
> >> So
> >>>> the
> >>>>>>>>>> content of the state store would change depending on the
> >>>>>> configuration
> >>>>>>>>>> for IQ. IMO, the content of the state store should be
> >>>> independent
> >>>>>> from
> >>>>>>>>>> IQ. Given this issue, I propose to not use transactions with
> >>>> ALOS at
> >>>>>>>>>> all. I was a big proponent of using transactions with ALOS,
> >> but
> >>>> I
> >>>>>>>>>> realized that transactions with ALOS is not as easy as
> >> enabling
> >>>>>>>>>> transactions on state stores. Another aspect that is
> >>>> problematic is
> >>>>>> that
> >>>>>>>>>> the changelog topic which actually replicates the state
> >> store
> >>>> is not
> >>>>>>>>>> transactional under ALOS. Thus, it might happen that the
> >> state
> >>>>>> store and
> >>>>>>>>>> the changelog differ in their content. All of this is maybe
> >>>> solvable
> >>>>>>>>>> somehow, but for the sake of this KIP, I would leave it for
> >> the
> >>>>>> future.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Bruno
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:
> >>>>>>>>>>> Hey Nick! First of all thanks for taking up this awesome
> >>>> feature,
> >>>>>> I'm
> >>>>>>>>>> sure
> >>>>>>>>>>> every single
> >>>>>>>>>>> Kafka Streams user and dev would agree that it is sorely
> >>>> needed.
> >>>>>>>>>>>
> >>>>>>>>>>> I've just been catching up on the KIP and surrounding
> >>>> discussion,
> >>>>>> so
> >>>>>>>>>> please
> >>>>>>>>>>> forgive me
> >>>>>>>>>>> for any misunderstandings or misinterpretations of the
> >> current
> >>>>>> plan and
> >>>>>>>>>>> don't hesitate to
> >>>>>>>>>>> correct me.
> >>>>>>>>>>>
> >>>>>>>>>>> Before I jump in, I just want to say that having seen this
> >>>> drag on
> >>>>>> for
> >>>>>>>> so
> >>>>>>>>>>> long, my singular
> >>>>>>>>>>> goal in responding is to help this KIP past a perceived
> >>>> impasse so
> >>>>>> we
> >>>>>>>> can
> >>>>>>>>>>> finally move on
> >>>>>>>>>>> to voting and implementing it. Long discussions are to be
> >>>> expected
> >>>>>> for
> >>>>>>>>>>> major features like
> >>>>>>>>>>> this but it's completely on us as the Streams devs to make
> >> sure
> >>>>>> there
> >>>>>>>> is
> >>>>>>>>>> an
> >>>>>>>>>>> end in sight
> >>>>>>>>>>> for any ongoing discussion.
> >>>>>>>>>>>
> >>>>>>>>>>> With that said, it's my understanding that the KIP as
> >> currently
> >>>>>>>> proposed
> >>>>>>>>>> is
> >>>>>>>>>>> just not tenable
> >>>>>>>>>>> for Kafka Streams, and would prevent some EOS users from
> >>>> upgrading
> >>>>>> to
> >>>>>>>> the
> >>>>>>>>>>> version it
> >>>>>>>>>>> first appears in. Given that we can't predict or guarantee
> >>>> whether
> >>>>>> any
> >>>>>>>> of
> >>>>>>>>>>> the followup KIPs
> >>>>>>>>>>> would be completed in the same release cycle as this one,
> >> we
> >>>> need
> >>>>>> to
> >>>>>>>> make
> >>>>>>>>>>> sure that the
> >>>>>>>>>>> feature is either compatible with all current users or else
> >>>>>>>>>> feature-flagged
> >>>>>>>>>>> so that they may
> >>>>>>>>>>> opt in/out.
> >>>>>>>>>>>
> >>>>>>>>>>> Therefore, IIUC we need to have either (or both) of these
> >> as
> >>>>>>>>>>> fully-implemented config options:
> >>>>>>>>>>> 1. default.state.isolation.level
> >>>>>>>>>>> 2. enable.transactional.state.stores
> >>>>>>>>>>>
> >>>>>>>>>>> This way EOS users for whom read_committed semantics are
> >> not
> >>>>>> viable can
> >>>>>>>>>>> still upgrade,
> >>>>>>>>>>> and either use the isolation.level config to leverage the
> >> new
> >>>> txn
> >>>>>> state
> >>>>>>>>>>> stores without sacrificing
> >>>>>>>>>>> their application semantics, or else simply keep the
> >>>> transactional
> >>>>>>>> state
> >>>>>>>>>>> stores disabled until we
> >>>>>>>>>>> are able to fully implement the isolation level
> >> configuration
> >>>> at
> >>>>>> either
> >>>>>>>>>> an
> >>>>>>>>>>> application or query level.
> >>>>>>>>>>>
> >>>>>>>>>>> Frankly you are the expert here and know much more about
> >> the
> >>>>>> tradeoffs
> >>>>>>>> in
> >>>>>>>>>>> both semantics and
> >>>>>>>>>>> effort level of implementing one of these configs vs the
> >>>> other. In
> >>>>>> my
> >>>>>>>>>>> opinion, either option would
> >>>>>>>>>>> be fine and I would leave the decision of which one to
> >> include
> >>>> in
> >>>>>> this
> >>>>>>>>>> KIP
> >>>>>>>>>>> completely up to you.
> >>>>>>>>>>> I just don't see a way for the KIP to proceed without some
> >>>>>> variation of
> >>>>>>>>>> the
> >>>>>>>>>>> above that would allow
> >>>>>>>>>>> EOS users to opt-out of read_committed.
> >>>>>>>>>>>
> >>>>>>>>>>> (If it's all the same to you, I would recommend always
> >>>> including a
> >>>>>>>>>> feature
> >>>>>>>>>>> flag in large structural
> >>>>>>>>>>> changes like this. No matter how much I trust someone or
> >>>> myself to
> >>>>>>>>>>> implement a feature, you just
> >>>>>>>>>>> never know what kind of bugs might slip in, especially
> >> with the
> >>>>>> very
> >>>>>>>>>> first
> >>>>>>>>>>> iteration that gets released.
> >>>>>>>>>>> So personally, my choice would be to add the feature flag
> >> and
> >>>>>> leave it
> >>>>>>>>>> off
> >>>>>>>>>>> by default. If all goes well
> >>>>>>>>>>> you can do a quick KIP to enable it by default as soon as
> >> the
> >>>>>>>>>>> isolation.level config has been
> >>>>>>>>>>> completed. But feel free to just pick whichever option is
> >>>> easiest
> >>>>>> or
> >>>>>>>>>>> quickest for you to implement)
> >>>>>>>>>>>
> >>>>>>>>>>> Hope this helps move the discussion forward,
> >>>>>>>>>>> Sophie
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Sep 19, 2023 at 1:57 AM Nick Telford <
> >>>>>> nick.telf...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Agreed, I can live with that for now.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In an effort to keep the scope of this KIP from
> >> expanding, I'm
> >>>>>> leaning
> >>>>>>>>>>>> towards just providing a configurable
> >>>>>> default.state.isolation.level
> >>>>>>>> and
> >>>>>>>>>>>> removing IsolationLevel from the StateStoreContext. This
> >>>> would be
> >>>>>>>>>>>> compatible with adding support for query-time
> >> IsolationLevels
> >>>> in
> >>>>>> the
> >>>>>>>>>>>> future, whilst providing a way for users to select an
> >>>> isolation
> >>>>>> level
> >>>>>>>>>> now.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The big problem with this, however, is that if a user
> >> selects
> >>>>>>>>>>>> processing.mode
> >>>>>>>>>>>> = "exactly-once(-v2|-beta)", and
> >>>> default.state.isolation.level =
> >>>>>>>>>>>> "READ_UNCOMMITTED", we need to guarantee that the data
> >> isn't
> >>>>>> written
> >>>>>>>> to
> >>>>>>>>>>>> disk until commit() is called, but we also need to permit
> >> IQ
> >>>>>> threads
> >>>>>>>> to
> >>>>>>>>>>>> read from the ongoing transaction.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A simple solution would be to (temporarily) forbid this
> >>>>>> combination of
> >>>>>>>>>>>> configuration, and have default.state.isolation.level
> >>>>>> automatically
> >>>>>>>>>> switch
> >>>>>>>>>>>> to READ_COMMITTED when processing.mode is anything other
> >> than
> >>>>>>>>>>>> at-least-once. Do you think this would be acceptable?
> >>>>>>>>>>>>
> >>>>>>>>>>>> In a later KIP, we can add support for query-time
> >> isolation
> >>>>>> levels and
> >>>>>>>>>>>> solve this particular problem there, which would relax
> >> this
> >>>>>>>> restriction.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Nick
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <
> >>>> cado...@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Why do we need to add READ_COMMITTED to
> >>>> InMemoryKeyValueStore? I
> >>>>>>>> think
> >>>>>>>>>>>>> it is perfectly valid to say InMemoryKeyValueStore do not
> >>>> support
> >>>>>>>>>>>>> READ_COMMITTED for now, since READ_UNCOMMITTED is the
> >>>> de-facto
> >>>>>>>> default
> >>>>>>>>>>>>> at the moment.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 9/18/23 7:12 PM, Nick Telford wrote:
> >>>>>>>>>>>>>> Oh! One other concern I haven't mentioned: if we make
> >>>>>>>> IsolationLevel a
> >>>>>>>>>>>>>> query-time constraint, then we need to add support for
> >>>>>>>> READ_COMMITTED
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> InMemoryKeyValueStore too, which will require some
> >> changes
> >>>> to
> >>>>>> the
> >>>>>>>>>>>>>> implementation.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, 18 Sept 2023 at 17:24, Nick Telford <
> >>>>>> nick.telf...@gmail.com
> >>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I agree that having IsolationLevel be determined at
> >>>> query-time
> >>>>>> is
> >>>>>>>> the
> >>>>>>>>>>>>>>> ideal design, but there are a few sticking points:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>> There needs to be some way to communicate the
> >>>> IsolationLevel
> >>>>>> down
> >>>>>>>> to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> RocksDBStore itself, so that the query can respect it.
> >>>> Since
> >>>>>> stores
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>> "layered" in functionality (i.e. ChangeLoggingStore,
> >>>>>> MeteredStore,
> >>>>>>>>>>>>> etc.),
> >>>>>>>>>>>>>>> we need some way to deliver that information to the
> >> bottom
> >>>>>> layer.
> >>>>>>>> For
> >>>>>>>>>>>>> IQv2,
> >>>>>>>>>>>>>>> we can use the existing State#query() method, but IQv1
> >> has
> >>>> no
> >>>>>> way
> >>>>>>>> to
> >>>>>>>>>>>> do
> >>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> A simple approach, which would potentially open up
> >> other
> >>>>>> options,
> >>>>>>>>>>>> would
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>> to add something like: ReadOnlyKeyValueStore<K, V>
> >>>>>>>>>>>>>>> readOnlyView(IsolationLevel isolationLevel) to
> >>>>>>>> ReadOnlyKeyValueStore
> >>>>>>>>>>>>> (and
> >>>>>>>>>>>>>>> similar to ReadOnlyWindowStore, ReadOnlySessionStore,
> >>>> etc.).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>> As mentioned above, RocksDB WriteBatches are not
> >>>> thread-safe,
> >>>>>> which
> >>>>>>>>>>>>> causes
> >>>>>>>>>>>>>>> a problem if we want to provide READ_UNCOMMITTED
> >>>> Iterators. I
> >>>>>> also
> >>>>>>>>>>>> had a
> >>>>>>>>>>>>>>> look at RocksDB Transactions[1], but they solve a very
> >>>>>> different
> >>>>>>>>>>>>> problem,
> >>>>>>>>>>>>>>> and have the same thread-safety issue.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> One possible approach that I mentioned is chaining
> >>>>>> WriteBatches:
> >>>>>>>>>> every
> >>>>>>>>>>>>>>> time a new Interactive Query is received (i.e.
> >>>> readOnlyView,
> >>>>>> see
> >>>>>>>>>>>> above,
> >>>>>>>>>>>>>>> is called) we "freeze" the existing WriteBatch, and
> >> start a
> >>>>>> new one
> >>>>>>>>>>>> for
> >>>>>>>>>>>>> new
> >>>>>>>>>>>>>>> writes. The Interactive Query queries the "chain" of
> >>>> previous
> >>>>>>>>>>>>> WriteBatches
> >>>>>>>>>>>>>>> + the underlying database; while the StreamThread
> >> starts
> >>>>>> writing to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> *new* WriteBatch. On-commit, the StreamThread would
> >> write
> >>>> *all*
> >>>>>>>>>>>>>>> WriteBatches in the chain to the database (that have
> >> not
> >>>> yet
> >>>>>> been
> >>>>>>>>>>>>> written).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> WriteBatches would be closed/freed only when they have
> >> been
> >>>>>> both
> >>>>>>>>>>>>>>> committed, and all open Interactive Queries on them
> >> have
> >>>> been
> >>>>>>>> closed.
> >>>>>>>>>>>>> This
> >>>>>>>>>>>>>>> would require some reference counting.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Obviously a drawback of this approach is the potential
> >> for
> >>>>>>>> increased
> >>>>>>>>>>>>>>> memory usage: if an Interactive Query is long-lived,
> >> for
> >>>>>> example by
> >>>>>>>>>>>>> doing a
> >>>>>>>>>>>>>>> full scan over a large database, or even just pausing
> >> in
> >>>> the
> >>>>>> middle
> >>>>>>>>>> of
> >>>>>>>>>>>>> an
> >>>>>>>>>>>>>>> iteration, then the existing chain of WriteBatches
> >> could be
> >>>>>> kept
> >>>>>>>>>>>> around
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>> a long time, potentially forever.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> A.
> >>>>>>>>>>>>>>> Going off on a tangent, it looks like in addition to
> >>>> supporting
> >>>>>>>>>>>>>>> READ_COMMITTED queries, we could go further and support
> >>>>>>>>>>>> REPEATABLE_READ
> >>>>>>>>>>>>>>> queries (i.e. where subsequent reads to the same key
> >> in the
> >>>>>> same
> >>>>>>>>>>>>>>> Interactive Query are guaranteed to yield the same
> >> value)
> >>>> by
> >>>>>> making
> >>>>>>>>>>>> use
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> RocksDB Snapshots[2]. These are fairly lightweight, so
> >> the
> >>>>>>>>>> performance
> >>>>>>>>>>>>>>> impact is likely to be negligible, but they do require
> >>>> that the
> >>>>>>>>>>>>> Interactive
> >>>>>>>>>>>>>>> Query session can be explicitly closed.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This could be achieved if we made the above
> >> readOnlyView
> >>>>>> interface
> >>>>>>>>>>>> look
> >>>>>>>>>>>>>>> more like:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> interface ReadOnlyKeyValueView<K, V> implements
> >>>>>>>>>>>> ReadOnlyKeyValueStore<K,
> >>>>>>>>>>>>>>> V>, AutoCloseable {}
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> interface ReadOnlyKeyValueStore<K, V> {
> >>>>>>>>>>>>>>>         ...
> >>>>>>>>>>>>>>>         ReadOnlyKeyValueView<K, V>
> >>>> readOnlyView(IsolationLevel
> >>>>>>>>>>>>> isolationLevel);
> >>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> But this would be a breaking change, as existing IQv1
> >>>> queries
> >>>>>> are
> >>>>>>>>>>>>>>> guaranteed to never call store.close(), and therefore
> >> these
> >>>>>> would
> >>>>>>>>>> leak
> >>>>>>>>>>>>>>> memory under REPEATABLE_READ.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> B.
> >>>>>>>>>>>>>>> One thing that's notable: MyRocks states that they
> >> support
> >>>>>>>>>>>>> READ_COMMITTED
> >>>>>>>>>>>>>>> and REPEATABLE_READ, but they make no mention of
> >>>>>>>>>>>> READ_UNCOMMITTED[3][4].
> >>>>>>>>>>>>>>> This could be because doing so is technically
> >>>>>> difficult/impossible
> >>>>>>>>>>>> using
> >>>>>>>>>>>>>>> the primitives available in RocksDB.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Lucas, to address your points:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> U1.
> >>>>>>>>>>>>>>> It's only "SHOULD" to permit alternative (i.e.
> >> non-RocksDB)
> >>>>>>>>>>>>>>> implementations of StateStore that do not support
> >> atomic
> >>>>>> writes.
> >>>>>>>>>>>>> Obviously
> >>>>>>>>>>>>>>> in those cases, the guarantees Kafka Streams
> >>>> provides/expects
> >>>>>> would
> >>>>>>>>>> be
> >>>>>>>>>>>>>>> relaxed. Do you think we should require all
> >>>> implementations to
> >>>>>>>>>> support
> >>>>>>>>>>>>>>> atomic writes?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> U2.
> >>>>>>>>>>>>>>> Stores can support multiple IsolationLevels. As we've
> >>>> discussed
> >>>>>>>>>> above,
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> ideal scenario would be to specify the IsolationLevel
> >> at
> >>>>>>>> query-time.
> >>>>>>>>>>>>>>> Failing that, I think the second-best approach is to
> >>>> define the
> >>>>>>>>>>>>>>> IsolationLevel for *all* queries based on the
> >>>> processing.mode,
> >>>>>>>> which
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>> what the default StateStoreContext#isolationLevel()
> >>>> achieves.
> >>>>>> Would
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>> prefer an alternative?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> While the existing implementation is equivalent to
> >>>>>>>> READ_UNCOMMITTED,
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>> can yield unexpected results/errors under EOS, if a
> >>>>>> transaction is
> >>>>>>>>>>>>> rolled
> >>>>>>>>>>>>>>> back. While this would be a change in behaviour for
> >> users,
> >>>> it
> >>>>>> would
> >>>>>>>>>>>> look
> >>>>>>>>>>>>>>> more like a bug fix than a breaking change. That said,
> >> we
> >>>>>> *could*
> >>>>>>>>>> make
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>>>> configurable, and default to the existing behaviour
> >>>>>>>>>> (READ_UNCOMMITTED)
> >>>>>>>>>>>>>>> instead of inferring it from the processing.mode?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> N1, N2.
> >>>>>>>>>>>>>>> These were only primitives to avoid boxing costs, but
> >> since
> >>>>>> this is
> >>>>>>>>>>>> not
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>> performance sensitive area, it should be fine to
> >> change if
> >>>>>> that's
> >>>>>>>>>>>>> desirable.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> N3.
> >>>>>>>>>>>>>>> It's because the store "manages its own offsets", which
> >>>>>> includes
> >>>>>>>> both
> >>>>>>>>>>>>>>> committing the offset, *and providing it* via
> >>>>>> getCommittedOffset().
> >>>>>>>>>>>>>>> Personally, I think "managesOffsets" conveys this best,
> >>>> but I
> >>>>>> don't
> >>>>>>>>>>>> mind
> >>>>>>>>>>>>>>> changing it if the nomenclature is unclear.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Sorry for the massive emails/essays!
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1:
> >> https://github.com/facebook/rocksdb/wiki/Transactions
> >>>>>>>>>>>>>>> 2: https://github.com/facebook/rocksdb/wiki/Snapshot
> >>>>>>>>>>>>>>> 3:
> >>>>>>>>
> >> https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
> >>>>>>>>>>>>>>> 4:
> >>>> https://mariadb.com/kb/en/myrocks-transactional-isolation/
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
> >>>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> since I last read it in April, the KIP has become much
> >>>>>> cleaner and
> >>>>>>>>>>>>>>>> easier to read. Great work!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It feels to me the last big open point is whether we
> >> can
> >>>>>> implement
> >>>>>>>>>>>>>>>> isolation level as a query parameter. I understand
> >> that
> >>>> there
> >>>>>> are
> >>>>>>>>>>>>>>>> implementation concerns, but as Colt says, it would
> >> be a
> >>>> great
> >>>>>>>>>>>>>>>> addition, and would also simplify the migration path
> >> for
> >>>> this
> >>>>>>>>>> change.
> >>>>>>>>>>>>>>>> Is the implementation problem you mentioned caused by
> >> the
> >>>>>>>> WriteBatch
> >>>>>>>>>>>>>>>> not having a notion of a snapshot, as the underlying
> >> DB
> >>>>>> iterator
> >>>>>>>>>>>> does?
> >>>>>>>>>>>>>>>> In that case, I am not sure a chain of WriteBatches
> >> as you
> >>>>>> propose
> >>>>>>>>>>>>>>>> would fully solve the problem, but maybe I didn't dig
> >>>> enough
> >>>>>> into
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> details to fully understand it.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If it's not possible to implement it now, would it be
> >> an
> >>>>>> option to
> >>>>>>>>>>>>>>>> make sure in this KIP that we do not fully close the
> >> door
> >>>> on
> >>>>>>>>>>>> per-query
> >>>>>>>>>>>>>>>> isolation levels in the interface, as it may be
> >> possible
> >>>> to
> >>>>>>>>>> implement
> >>>>>>>>>>>>>>>> the missing primitives in RocksDB or Speedb in the
> >> future.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Understanding:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> * U1) Why is it only "SHOULD" for changelogOffsets to
> >> be
> >>>>>> persisted
> >>>>>>>>>>>>>>>> atomically with the records?
> >>>>>>>>>>>>>>>> * U2) Don't understand the default implementation of
> >>>>>>>>>>>> `isolationLevel`.
> >>>>>>>>>>>>>>>> The isolation level should be a property of the
> >> underlying
> >>>>>> store,
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>> not be defined by the default config? Existing stores
> >>>> probably
> >>>>>>>> don't
> >>>>>>>>>>>>>>>> guarantee READ_COMMITTED, so the default should be to
> >>>> return
> >>>>>>>>>>>>>>>> READ_UNCOMMITTED.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Nits:
> >>>>>>>>>>>>>>>> * N1) Could `getComittedOffset` use an `OptionalLong`
> >>>> return
> >>>>>> type,
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> avoid the `null`?
> >>>>>>>>>>>>>>>> * N2) Could `apporixmateNumUncomittedBytes` use an
> >>>>>> `OptionalLong`
> >>>>>>>>>>>>>>>> return type, to avoid the `-1`?
> >>>>>>>>>>>>>>>> * N3) I don't understand why `managesOffsets` uses the
> >>>>>> 'manage'
> >>>>>>>>>> verb,
> >>>>>>>>>>>>>>>> whereas all other methods use the "commits" verb. I'd
> >>>> suggest
> >>>>>>>>>>>>>>>> `commitsOffsets`.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Either way, it feels this KIP is very close to the
> >> finish
> >>>>>> line,
> >>>>>>>> I'm
> >>>>>>>>>>>>>>>> looking forward to seeing this in production!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>> Lucas
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <
> >>>>>> c...@littlehorse.io
> >>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Making IsolationLevel a query-time constraint,
> >> rather
> >>>> than
> >>>>>>>> linking
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> the processing.guarantee.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> As I understand it, would this allow even a user of
> >> EOS
> >>>> to
> >>>>>>>> control
> >>>>>>>>>>>>>>>> whether
> >>>>>>>>>>>>>>>>> reading committed or uncommitted records? If so, I am
> >>>> highly
> >>>>>> in
> >>>>>>>>>>>> favor
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I know that I was one of the early people to point
> >> out
> >>>> the
> >>>>>>>> current
> >>>>>>>>>>>>>>>>> shortcoming that IQ reads uncommitted records, but
> >> just
> >>>> this
> >>>>>>>>>>>> morning I
> >>>>>>>>>>>>>>>>> realized a pattern we use which means that (for
> >> certain
> >>>>>> queries)
> >>>>>>>>>> our
> >>>>>>>>>>>>>>>> system
> >>>>>>>>>>>>>>>>> needs to be able to read uncommitted records, which
> >> is
> >>>> the
> >>>>>>>> current
> >>>>>>>>>>>>>>>> behavior
> >>>>>>>>>>>>>>>>> of Kafka Streams in EOS.***
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If IsolationLevel being a query-time decision allows
> >> for
> >>>>>> this,
> >>>>>>>> then
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> would be amazing. I would also vote that the default
> >>>> behavior
> >>>>>>>>>> should
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> reading uncommitted records, because it is totally
> >>>> possible
> >>>>>> for a
> >>>>>>>>>>>>> valid
> >>>>>>>>>>>>>>>>> application to depend on that behavior, and breaking
> >> it
> >>>> in a
> >>>>>>>> minor
> >>>>>>>>>>>>>>>> release
> >>>>>>>>>>>>>>>>> might be a bit strong.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> *** (Note, for the curious reader....) Our
> >> use-case/query
> >>>>>> pattern
> >>>>>>>>>>>> is a
> >>>>>>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>> complex, but reading "uncommitted" records is
> >> actually
> >>>> safe
> >>>>>> in
> >>>>>>>> our
> >>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>> because processing is deterministic. Additionally, IQ
> >>>> being
> >>>>>> able
> >>>>>>>> to
> >>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>> uncommitted records is crucial to enable "read your
> >> own
> >>>>>> writes"
> >>>>>>>> on
> >>>>>>>>>>>> our
> >>>>>>>>>>>>>>>> API:
> >>>>>>>>>>>>>>>>> Due to the deterministic processing, we send an
> >> "ack" to
> >>>> the
> >>>>>>>> client
> >>>>>>>>>>>>> who
> >>>>>>>>>>>>>>>>> makes the request as soon as the processor processes
> >> the
> >>>>>> result.
> >>>>>>>> If
> >>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>> can't read uncommitted records, they may receive a
> >> "201 -
> >>>>>>>> Created"
> >>>>>>>>>>>>>>>>> response, immediately followed by a "404 - Not Found"
> >>>> when
> >>>>>> doing
> >>>>>>>> a
> >>>>>>>>>>>>>>>> lookup
> >>>>>>>>>>>>>>>>> for the object they just created).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <
> >>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Addendum:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I think we would also face the same problem with the
> >>>>>> approach
> >>>>>>>> John
> >>>>>>>>>>>>>>>> outlined
> >>>>>>>>>>>>>>>>>> earlier (using the record cache as a transaction
> >> buffer
> >>>> and
> >>>>>>>>>>>> flushing
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> straight to SST files). This is because the record
> >> cache
> >>>>>> (the
> >>>>>>>>>>>>>>>> ThreadCache
> >>>>>>>>>>>>>>>>>> class) is not thread-safe, so every commit would
> >>>> invalidate
> >>>>>> open
> >>>>>>>>>> IQ
> >>>>>>>>>>>>>>>>>> Iterators in the same way that RocksDB WriteBatches
> >> do.
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford <
> >>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I've updated the KIP based on our conversation. The
> >>>> only
> >>>>>> things
> >>>>>>>>>>>>>>>> I've not
> >>>>>>>>>>>>>>>>>>> yet done are:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1. Using transactions under ALOS and EOS.
> >>>>>>>>>>>>>>>>>>> 2. Making IsolationLevel a query-time constraint,
> >>>> rather
> >>>>>> than
> >>>>>>>>>>>>>>>> linking it
> >>>>>>>>>>>>>>>>>>> to the processing.guarantee.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> There's a wrinkle that makes this a challenge:
> >>>> Interactive
> >>>>>>>>>> Queries
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> open an Iterator, when using transactions and
> >>>>>> READ_UNCOMMITTED.
> >>>>>>>>>>>>>>>>>>> The problem is that under READ_UNCOMMITTED, queries
> >>>> need
> >>>>>> to be
> >>>>>>>>>>>> able
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> read records from the currently uncommitted
> >> transaction
> >>>>>> buffer
> >>>>>>>>>>>>>>>>>>> (WriteBatch). This includes for Iterators, which
> >> should
> >>>>>> iterate
> >>>>>>>>>>>>>>>> both the
> >>>>>>>>>>>>>>>>>>> transaction buffer and underlying database (using
> >>>>>>>>>>>>>>>>>>> WriteBatch#iteratorWithBase()).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The issue is that when the StreamThread commits, it
> >>>> writes
> >>>>>> the
> >>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>> WriteBatch to RocksDB *and then clears the
> >> WriteBatch*.
> >>>>>>>> Clearing
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> WriteBatch while an Interactive Query holds an open
> >>>>>> Iterator on
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>> invalidate the Iterator. Worse, it turns out that
> >>>> Iterators
> >>>>>>>> over
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> WriteBatch become invalidated not just when the
> >>>> WriteBatch
> >>>>>> is
> >>>>>>>>>>>>>>>> cleared,
> >>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>> also when the Iterators' current key receives a new
> >>>> write.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Now that I'm writing this, I remember that this is
> >> the
> >>>>>> major
> >>>>>>>>>>>> reason
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>> switched the original design from having a
> >> query-time
> >>>>>>>>>>>>>>>> IsolationLevel to
> >>>>>>>>>>>>>>>>>>> having the IsolationLevel linked to the
> >>>> transactionality
> >>>>>> of the
> >>>>>>>>>>>>>>>> stores
> >>>>>>>>>>>>>>>>>>> themselves.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> It *might* be possible to resolve this, by having a
> >>>>>> "chain" of
> >>>>>>>>>>>>>>>>>>> WriteBatches, with the StreamThread switching to a
> >> new
> >>>>>>>> WriteBatch
> >>>>>>>>>>>>>>>>>> whenever
> >>>>>>>>>>>>>>>>>>> a new Interactive Query attempts to read from the
> >>>>>> database, but
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>> cause some performance problems/memory pressure
> >> when
> >>>>>> subjected
> >>>>>>>> to
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> high
> >>>>>>>>>>>>>>>>>>> Interactive Query load. It would also reduce the
> >>>>>> efficiency of
> >>>>>>>>>>>>>>>>>> WriteBatches
> >>>>>>>>>>>>>>>>>>> on-commit, as we'd have to write N WriteBatches,
> >> where
> >>>> N
> >>>>>> is the
> >>>>>>>>>>>>>>>> number of
> >>>>>>>>>>>>>>>>>>> Interactive Queries since the last commit.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I realise this is getting into the weeds of the
> >>>>>> implementation,
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> you'd
> >>>>>>>>>>>>>>>>>>> rather we focus on the API for now, but I think
> >> it's
> >>>>>> important
> >>>>>>>> to
> >>>>>>>>>>>>>>>>>> consider
> >>>>>>>>>>>>>>>>>>> how to implement the desired API, in case we come
> >> up
> >>>> with
> >>>>>> an
> >>>>>>>> API
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> cannot be implemented efficiently, or even at all!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thoughts?
> >>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <
> >>>>>>>> cado...@apache.org
> >>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 6.
> >>>>>>>>>>>>>>>>>>>> Of course, you are right! My bad!
> >>>>>>>>>>>>>>>>>>>> Wiping out the state in the downgrading case is
> >> fine.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3a.
> >>>>>>>>>>>>>>>>>>>> Focus on the public facing changes for the KIP. We
> >>>> will
> >>>>>> manage
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>> the internals right. Regarding state stores that
> >> do
> >>>> not
> >>>>>>>> support
> >>>>>>>>>>>>>>>>>>>> READ_COMMITTED, they should throw an error stating
> >>>> that
> >>>>>> they
> >>>>>>>> do
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> support READ_COMMITTED. No need to adapt all state
> >>>> stores
> >>>>>>>>>>>>>>>> immediately.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3b.
> >>>>>>>>>>>>>>>>>>>> I am in favor of using transactions also for ALOS.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for getting back to me!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>> The fact that implementations can always track
> >>>> estimated
> >>>>>>>> memory
> >>>>>>>>>>>>>>>> usage
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> the wrapper is a good point. I can remove -1 as
> >> an
> >>>>>> option,
> >>>>>>>> and
> >>>>>>>>>>>>>>>> I'll
> >>>>>>>>>>>>>>>>>>>> clarify
> >>>>>>>>>>>>>>>>>>>>> the JavaDoc that 0 is not just for
> >> non-transactional
> >>>>>> stores,
> >>>>>>>>>>>>>>>> which is
> >>>>>>>>>>>>>>>>>>>>> currently misleading.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 6.
> >>>>>>>>>>>>>>>>>>>>> The problem with catching the exception in the
> >>>> downgrade
> >>>>>>>>>> process
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> would require new code in the Kafka version being
> >>>>>> downgraded
> >>>>>>>>>> to.
> >>>>>>>>>>>>>>>> Since
> >>>>>>>>>>>>>>>>>>>>> users could conceivably downgrade to almost *any*
> >>>> older
> >>>>>>>> version
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>> Streams, I'm not sure how we could add that code?
> >>>>>>>>>>>>>>>>>>>>> The only way I can think of doing it would be to
> >>>> provide
> >>>>>> a
> >>>>>>>>>>>>>>>> dedicated
> >>>>>>>>>>>>>>>>>>>>> downgrade tool, that goes through every local
> >> store
> >>>> and
> >>>>>>>> removes
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> offsets column families. But that seems like an
> >>>>>> unnecessary
> >>>>>>>>>>>>>>>> amount of
> >>>>>>>>>>>>>>>>>>>> extra
> >>>>>>>>>>>>>>>>>>>>> code to maintain just to handle a somewhat niche
> >>>>>> situation,
> >>>>>>>>>> when
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> alternative (automatically wipe and restore
> >> stores)
> >>>>>> should be
> >>>>>>>>>>>>>>>>>>>> acceptable.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've
> >>>> requested.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 3a.
> >>>>>>>>>>>>>>>>>>>>> I agree that IsolationLevel makes more sense at
> >>>>>> query-time,
> >>>>>>>> and
> >>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>>> initially attempted to place the IsolationLevel
> >> at
> >>>>>>>> query-time,
> >>>>>>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>>> ran
> >>>>>>>>>>>>>>>>>>>>> into some problems:
> >>>>>>>>>>>>>>>>>>>>> - The key issue is that, under ALOS we're not
> >> staging
> >>>>>> writes
> >>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> transactions, so can't perform writes at the
> >>>>>> READ_COMMITTED
> >>>>>>>>>>>>>>>> isolation
> >>>>>>>>>>>>>>>>>>>>> level. However, this may be addressed if we
> >> decide to
> >>>>>>>> *always*
> >>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>> transactions as discussed under 3b.
> >>>>>>>>>>>>>>>>>>>>> - IQv1 and IQv2 have quite different
> >>>> implementations. I
> >>>>>>>>>> remember
> >>>>>>>>>>>>>>>>>> having
> >>>>>>>>>>>>>>>>>>>>> some difficulty understanding the IQv1 internals,
> >>>> which
> >>>>>> made
> >>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> difficult
> >>>>>>>>>>>>>>>>>>>>> to determine what needed to be changed. However,
> >> I
> >>>>>> *think*
> >>>>>>>> this
> >>>>>>>>>>>>>>>> can be
> >>>>>>>>>>>>>>>>>>>>> addressed for both implementations by wrapping
> >> the
> >>>>>>>> RocksDBStore
> >>>>>>>>>>>>>>>> in an
> >>>>>>>>>>>>>>>>>>>>> IsolationLevel-dependent wrapper, that overrides
> >> read
> >>>>>> methods
> >>>>>>>>>>>>>>>> (get,
> >>>>>>>>>>>>>>>>>>>> etc.)
> >>>>>>>>>>>>>>>>>>>>> to either read directly from the database or
> >> from the
> >>>>>> ongoing
> >>>>>>>>>>>>>>>>>>>> transaction.
> >>>>>>>>>>>>>>>>>>>>> But IQv1 might still be difficult.
> >>>>>>>>>>>>>>>>>>>>> - If IsolationLevel becomes a query constraint,
> >> then
> >>>> all
> >>>>>>>> other
> >>>>>>>>>>>>>>>>>>>> StateStores
> >>>>>>>>>>>>>>>>>>>>> will need to respect it, including the in-memory
> >>>> stores.
> >>>>>> This
> >>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>> require
> >>>>>>>>>>>>>>>>>>>>> us to adapt in-memory stores to stage their
> >> writes so
> >>>>>> they
> >>>>>>>> can
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> isolated
> >>>>>>>>>>>>>>>>>>>>> from READ_COMMITTTED queries. It would also
> >> become an
> >>>>>>>> important
> >>>>>>>>>>>>>>>>>>>>> consideration for third-party stores on upgrade,
> >> as
> >>>>>> without
> >>>>>>>>>>>>>>>> changes,
> >>>>>>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>>> would not support READ_COMMITTED queries
> >> correctly.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Ultimately, I may need some help making the
> >> necessary
> >>>>>> change
> >>>>>>>> to
> >>>>>>>>>>>>>>>> IQv1
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> support this, but I don't think it's
> >> fundamentally
> >>>>>>>> impossible,
> >>>>>>>>>>>>>>>> if we
> >>>>>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>>>> to pursue this route.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 3b.
> >>>>>>>>>>>>>>>>>>>>> The main reason I chose to keep ALOS
> >> un-transactional
> >>>>>> was to
> >>>>>>>>>>>>>>>> minimize
> >>>>>>>>>>>>>>>>>>>>> behavioural change for most users (I believe most
> >>>> Streams
> >>>>>>>> users
> >>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> default configuration, which is ALOS). That said,
> >>>> it's
> >>>>>> clear
> >>>>>>>>>>>>>>>> that if
> >>>>>>>>>>>>>>>>>>>> ALOS
> >>>>>>>>>>>>>>>>>>>>> also used transactional stores, the only change
> >> in
> >>>>>> behaviour
> >>>>>>>>>>>>>>>> would be
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> it would become *more correct*, which could be
> >>>>>> considered a
> >>>>>>>>>> "bug
> >>>>>>>>>>>>>>>> fix"
> >>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>> users, rather than a change they need to handle.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I believe that performance using transactions
> >> (aka.
> >>>>>> RocksDB
> >>>>>>>>>>>>>>>>>>>> WriteBatches)
> >>>>>>>>>>>>>>>>>>>>> should actually be *better* than the un-batched
> >>>>>> write-path
> >>>>>>>> that
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> currently used[1]. The only "performance"
> >>>> consideration
> >>>>>> will
> >>>>>>>> be
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> increased memory usage that transactions require.
> >>>> Given
> >>>>>> the
> >>>>>>>>>>>>>>>>>> mitigations
> >>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> this memory that we have in place, I would expect
> >>>> that
> >>>>>> this
> >>>>>>>> is
> >>>>>>>>>>>>>>>> not a
> >>>>>>>>>>>>>>>>>>>>> problem for most users.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> If we're happy to do so, we can make ALOS also
> >> use
> >>>>>>>>>> transactions.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Link 1:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>
> >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <
> >>>>>>>>>>>> cado...@apache.org
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the updates and sorry for the delay
> >> on my
> >>>>>> side!
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>>>> Making the default implementation for flush() a
> >>>> no-op
> >>>>>> sounds
> >>>>>>>>>>>>>>>> good to
> >>>>>>>>>>>>>>>>>>>> me.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>> I think what was bugging me here is that a
> >>>> third-party
> >>>>>> state
> >>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>>>>>>>>> to implement the state store interface. That
> >> means
> >>>> they
> >>>>>> need
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> implement a wrapper around the actual state
> >> store
> >>>> as we
> >>>>>> do
> >>>>>>>> for
> >>>>>>>>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>>>>>>>>> with RocksDBStore. So, a third-party state
> >> store can
> >>>>>> always
> >>>>>>>>>>>>>>>> estimate
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> uncommitted bytes, if it wants, because the
> >> wrapper
> >>>> can
> >>>>>>>> record
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> added
> >>>>>>>>>>>>>>>>>>>>>> bytes.
> >>>>>>>>>>>>>>>>>>>>>> One case I can think of where returning -1 makes
> >>>> sense
> >>>>>> is
> >>>>>>>> when
> >>>>>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>>>>>> does not need to estimate the size of the write
> >>>> batch
> >>>>>> and
> >>>>>>>>>>>>>>>> trigger
> >>>>>>>>>>>>>>>>>>>>>> extraordinary commits, because the third-party
> >> state
> >>>>>> store
> >>>>>>>>>>>>>>>> takes care
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> memory. But in that case the method could also
> >> just
> >>>>>> return
> >>>>>>>> 0.
> >>>>>>>>>>>>>>>> Even
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> case would be better solved with a method that
> >>>> returns
> >>>>>>>> whether
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>> store manages itself the memory used for
> >> uncommitted
> >>>>>> bytes
> >>>>>>>> or
> >>>>>>>>>>>>>>>> not.
> >>>>>>>>>>>>>>>>>>>>>> Said that, I am fine with keeping the -1 return
> >>>> value,
> >>>>>> I was
> >>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>>>> wondering when and if it will be used.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regarding returning 0 for transactional state
> >> stores
> >>>>>> when
> >>>>>>>> the
> >>>>>>>>>>>>>>>> batch
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> empty, I was just wondering because you
> >> explicitly
> >>>>>> stated
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> "or {@code 0} if this StateStore does not
> >> support
> >>>>>>>>>>>> transactions."
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> So it seemed to me returning 0 could only
> >> happen for
> >>>>>>>>>>>>>>>>>> non-transactional
> >>>>>>>>>>>>>>>>>>>>>> state stores.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> a) What do you think if we move the isolation
> >> level
> >>>> to
> >>>>>> IQ
> >>>>>>>> (v1
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> v2)?
> >>>>>>>>>>>>>>>>>>>>>> In the end this is the only component that
> >> really
> >>>> needs
> >>>>>> to
> >>>>>>>>>>>>>>>> specify
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> isolation level. It is similar to the Kafka
> >> consumer
> >>>>>> that
> >>>>>>>> can
> >>>>>>>>>>>>>>>> choose
> >>>>>>>>>>>>>>>>>>>>>> with what isolation level to read the input
> >> topic.
> >>>>>>>>>>>>>>>>>>>>>> For IQv1 the isolation level should go into
> >>>>>>>>>>>>>>>> StoreQueryParameters. For
> >>>>>>>>>>>>>>>>>>>>>> IQv2, I would add it to the Query interface.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> b) Point a) raises the question what should
> >> happen
> >>>>>> during
> >>>>>>>>>>>>>>>>>> at-least-once
> >>>>>>>>>>>>>>>>>>>>>> processing when the state store does not use
> >>>>>> transactions?
> >>>>>>>>>> John
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> past proposed to also use transactions on state
> >>>> stores
> >>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> at-least-once. I like that idea, because it
> >> avoids
> >>>>>>>> aggregating
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>> records over and over again in the case of a
> >>>> failure. We
> >>>>>>>> had a
> >>>>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> the past where a Streams applications in
> >>>> at-least-once
> >>>>>> mode
> >>>>>>>>>> was
> >>>>>>>>>>>>>>>>>> failing
> >>>>>>>>>>>>>>>>>>>>>> continuously for some reasons I do not remember
> >>>> before
> >>>>>>>>>>>>>>>> committing the
> >>>>>>>>>>>>>>>>>>>>>> offsets. After each failover, the app aggregated
> >>>> again
> >>>>>> and
> >>>>>>>>>>>>>>>> again the
> >>>>>>>>>>>>>>>>>>>>>> same records. Of course the aggregate increased
> >> to
> >>>> very
> >>>>>>>> wrong
> >>>>>>>>>>>>>>>> values
> >>>>>>>>>>>>>>>>>>>>>> just because of the failover. With transactions
> >> on
> >>>> the
> >>>>>> state
> >>>>>>>>>>>>>>>> stores
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> could have avoided this. The app would have
> >> output
> >>>> the
> >>>>>> same
> >>>>>>>>>>>>>>>> aggregate
> >>>>>>>>>>>>>>>>>>>>>> multiple times (i.e., after each failover) but
> >> at
> >>>> least
> >>>>>> the
> >>>>>>>>>>>>>>>> value of
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> aggregate would not depend on the number of
> >>>> failovers.
> >>>>>>>>>>>>>>>> Outputting the
> >>>>>>>>>>>>>>>>>>>>>> same aggregate multiple times would be incorrect
> >>>> under
> >>>>>>>>>>>>>>>> exactly-once
> >>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>> it is OK for at-least-once.
> >>>>>>>>>>>>>>>>>>>>>> If it makes sense to add a config to turn on
> >> and off
> >>>>>>>>>>>>>>>> transactions on
> >>>>>>>>>>>>>>>>>>>>>> state stores under at-least-once or just use
> >>>>>> transactions in
> >>>>>>>>>>>>>>>> any case
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> a question we should also discuss in this KIP.
> >> It
> >>>>>> depends a
> >>>>>>>>>> bit
> >>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> performance trade-off. Maybe to be safe, I would
> >>>> add a
> >>>>>>>> config.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 4.
> >>>>>>>>>>>>>>>>>>>>>> Your points are all valid. I tend to say to
> >> keep the
> >>>>>> metrics
> >>>>>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>>>>>>>> flush() until we remove flush() completely from
> >> the
> >>>>>>>> interface.
> >>>>>>>>>>>>>>>> Calls
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> flush() might still exist since existing
> >> processors
> >>>>>> might
> >>>>>>>>>> still
> >>>>>>>>>>>>>>>> call
> >>>>>>>>>>>>>>>>>>>>>> flush() explicitly as you mentioned in 1). For
> >>>> sure, we
> >>>>>> need
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> document
> >>>>>>>>>>>>>>>>>>>>>> how the metrics change due to the transactions
> >> in
> >>>> the
> >>>>>>>> upgrade
> >>>>>>>>>>>>>>>> notes.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 5.
> >>>>>>>>>>>>>>>>>>>>>> I see. Then you should describe how the
> >> .position
> >>>> files
> >>>>>> are
> >>>>>>>>>>>>>>>> handled
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> a dedicated section of the KIP or incorporate
> >> the
> >>>>>>>> description
> >>>>>>>>>>>>>>>> in the
> >>>>>>>>>>>>>>>>>>>>>> "Atomic Checkpointing" section instead of only
> >>>>>> mentioning it
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> "Compatibility, Deprecation, and Migration
> >> Plan".
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 6.
> >>>>>>>>>>>>>>>>>>>>>> Describing upgrading and downgrading in the KIP
> >> is a
> >>>>>> good
> >>>>>>>>>> idea.
> >>>>>>>>>>>>>>>>>>>>>> Regarding downgrading, I think you could also
> >> catch
> >>>> the
> >>>>>>>>>>>>>>>> exception and
> >>>>>>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>>>>> what is needed to downgrade, e.g., drop the
> >> column
> >>>>>> family.
> >>>>>>>> See
> >>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> an example:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> It is a bit brittle, but it works.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for taking the time to review the KIP.
> >> I'm
> >>>> back
> >>>>>> from
> >>>>>>>>>>>>>>>> leave
> >>>>>>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> intend to move this forwards as quickly as I
> >> can.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Addressing your points:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>>>>> Because flush() is part of the StateStore API,
> >> it's
> >>>>>> exposed
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> custom
> >>>>>>>>>>>>>>>>>>>>>>> Processors, which might be making calls to
> >> flush().
> >>>>>> This
> >>>>>>>> was
> >>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> case in a few integration tests.
> >>>>>>>>>>>>>>>>>>>>>>> To maintain as much compatibility as possible,
> >> I'd
> >>>>>> prefer
> >>>>>>>> not
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>> an UnsupportedOperationException, as it will
> >> cause
> >>>>>>>> previously
> >>>>>>>>>>>>>>>>>> working
> >>>>>>>>>>>>>>>>>>>>>>> Processors to start throwing exceptions at
> >> runtime.
> >>>>>>>>>>>>>>>>>>>>>>> I agree that it doesn't make sense for it to
> >> proxy
> >>>>>>>> commit(),
> >>>>>>>>>>>>>>>> though,
> >>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>> that would cause it to violate the "StateStores
> >>>> commit
> >>>>>> only
> >>>>>>>>>>>>>>>> when the
> >>>>>>>>>>>>>>>>>>>> Task
> >>>>>>>>>>>>>>>>>>>>>>> commits" rule.
> >>>>>>>>>>>>>>>>>>>>>>> Instead, I think we should make this a no-op.
> >> That
> >>>> way,
> >>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>>>>>> Processors will continue to work as-before,
> >> without
> >>>>>>>> violation
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>> consistency that would be caused by premature
> >>>>>> flush/commit
> >>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> StateStore
> >>>>>>>>>>>>>>>>>>>>>>> data to disk.
> >>>>>>>>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>> As stated in the JavaDoc, when a StateStore
> >>>>>> implementation
> >>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>> transactional, but is unable to estimate the
> >>>>>> uncommitted
> >>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>> usage,
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> method will return -1.
> >>>>>>>>>>>>>>>>>>>>>>> The intention here is to permit third-party
> >>>>>> implementations
> >>>>>>>>>>>>>>>> that may
> >>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>> able to estimate memory usage.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Yes, it will be 0 when nothing has been
> >> written to
> >>>> the
> >>>>>>>> store
> >>>>>>>>>>>>>>>> yet. I
> >>>>>>>>>>>>>>>>>>>>>> thought
> >>>>>>>>>>>>>>>>>>>>>>> that was implied by "This method will return an
> >>>>>>>> approximation
> >>>>>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>>>>>>> would be freed by the next call to {@link
> >>>>>> #commit(Map)}"
> >>>>>>>> and
> >>>>>>>>>>>>>>>>>> "@return
> >>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>> approximate size of all records awaiting {@link
> >>>>>>>>>>>> #commit(Map)}",
> >>>>>>>>>>>>>>>>>>>> however,
> >>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>> can add it explicitly to the JavaDoc if you
> >> think
> >>>> this
> >>>>>> is
> >>>>>>>>>>>>>>>> unclear?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>> I realise this is probably the most contentious
> >>>> point
> >>>>>> in my
> >>>>>>>>>>>>>>>> design,
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>>>>> open to changing it if I'm unable to convince
> >> you
> >>>> of
> >>>>>> the
> >>>>>>>>>>>>>>>> benefits.
> >>>>>>>>>>>>>>>>>>>>>>> Nevertheless, here's my argument:
> >>>>>>>>>>>>>>>>>>>>>>> The Interactive Query (IQ) API(s) are directly
> >>>> provided
> >>>>>>>>>>>>>>>> StateStores
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> query, and it may be important for users to
> >>>>>>>> programmatically
> >>>>>>>>>>>>>>>> know
> >>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>> mode the StateStore is operating under. If we
> >>>> simply
> >>>>>>>> provide
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>> "eosEnabled" boolean (as used throughout the
> >>>> internal
> >>>>>>>> streams
> >>>>>>>>>>>>>>>>>>>> engine), or
> >>>>>>>>>>>>>>>>>>>>>>> similar, then users will need to understand the
> >>>>>> operation
> >>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> consequences
> >>>>>>>>>>>>>>>>>>>>>>> of each available processing mode and how it
> >>>> pertains
> >>>>>> to
> >>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>> StateStore.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Interactive Query users aren't the only people
> >> that
> >>>>>> care
> >>>>>>>>>> about
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> processing.mode/IsolationLevel of a StateStore:
> >>>>>>>> implementers
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> custom
> >>>>>>>>>>>>>>>>>>>>>>> StateStores also need to understand the
> >> behaviour
> >>>>>> expected
> >>>>>>>> of
> >>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>>> implementation. KIP-892 introduces some
> >> assumptions
> >>>>>> into
> >>>>>>>> the
> >>>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>>>>>> Engine
> >>>>>>>>>>>>>>>>>>>>>>> about how StateStores operate under each
> >> processing
> >>>>>> mode,
> >>>>>>>> and
> >>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>> important that custom implementations adhere to
> >>>> those
> >>>>>>>>>>>>>>>> assumptions in
> >>>>>>>>>>>>>>>>>>>>>> order
> >>>>>>>>>>>>>>>>>>>>>>> to maintain the consistency guarantees.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> IsolationLevels provide a high-level contract
> >> on
> >>>> the
> >>>>>>>>>> behaviour
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> StateStore: a user knows that under
> >> READ_COMMITTED,
> >>>>>> they
> >>>>>>>> will
> >>>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>>>>>> writes
> >>>>>>>>>>>>>>>>>>>>>>> only after the Task has committed, and under
> >>>>>>>> READ_UNCOMMITTED
> >>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>>>>>>>>> writes immediately. No understanding of the
> >>>> details of
> >>>>>> each
> >>>>>>>>>>>>>>>>>>>>>> processing.mode
> >>>>>>>>>>>>>>>>>>>>>>> is required, either for IQ users or StateStore
> >>>>>>>> implementers.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> An argument can be made that these contractual
> >>>>>> guarantees
> >>>>>>>> can
> >>>>>>>>>>>>>>>> simply
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>> documented for the processing.mode (i.e. that
> >>>>>> exactly-once
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and
> >>>>>>>> at-least-once
> >>>>>>>>>>>>>>>> behaves
> >>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED), but there are several small
> >>>> issues
> >>>>>> with
> >>>>>>>>>>>>>>>> this I'd
> >>>>>>>>>>>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>>>>>>> to avoid:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>          - Where would we document these
> >> contracts,
> >>>> in
> >>>>>> a way
> >>>>>>>>>> that
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> difficult
> >>>>>>>>>>>>>>>>>>>>>>>          for users/implementers to miss/ignore?
> >>>>>>>>>>>>>>>>>>>>>>>          - It's not clear to users that the
> >>>> processing
> >>>>>> mode
> >>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> communicating
> >>>>>>>>>>>>>>>>>>>>>>>          an expectation of read isolation,
> >> unless
> >>>> they
> >>>>>> read
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> documentation. Users
> >>>>>>>>>>>>>>>>>>>>>>>          rarely consult documentation unless
> >> they
> >>>> feel
> >>>>>> they
> >>>>>>>>>> need
> >>>>>>>>>>>>>>>> to, so
> >>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>> likely
> >>>>>>>>>>>>>>>>>>>>>>>          this detail would get missed by many
> >> users.
> >>>>>>>>>>>>>>>>>>>>>>>          - It tightly couples processing modes
> >> to
> >>>> read
> >>>>>>>>>> isolation.
> >>>>>>>>>>>>>>>> Adding
> >>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>          processing modes, or changing the read
> >>>>>> isolation of
> >>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>>>>>>          modes would be difficult/impossible.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Ultimately, the cost of introducing
> >>>> IsolationLevels is
> >>>>>>>> just a
> >>>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>>>>>>> method, since we re-use the existing
> >> IsolationLevel
> >>>>>> enum
> >>>>>>>> from
> >>>>>>>>>>>>>>>> Kafka.
> >>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>> gives us a clear place to document the
> >> contractual
> >>>>>>>> guarantees
> >>>>>>>>>>>>>>>>>> expected
> >>>>>>>>>>>>>>>>>>>>>>> of/provided by StateStores, that is accessible
> >>>> both by
> >>>>>> the
> >>>>>>>>>>>>>>>>>> StateStore
> >>>>>>>>>>>>>>>>>>>>>>> itself, and by IQ users.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> (Writing this I've just realised that the
> >>>> StateStore
> >>>>>> and IQ
> >>>>>>>>>>>>>>>> APIs
> >>>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>>>>> don't provide access to StateStoreContext that
> >> IQ
> >>>> users
> >>>>>>>> would
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> direct
> >>>>>>>>>>>>>>>>>>>>>>> access to... Perhaps StateStore should expose
> >>>>>>>>>> isolationLevel()
> >>>>>>>>>>>>>>>>>> itself
> >>>>>>>>>>>>>>>>>>>>>> too?)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 4.
> >>>>>>>>>>>>>>>>>>>>>>> Yeah, I'm not comfortable renaming the metrics
> >>>> in-place
> >>>>>>>>>>>>>>>> either, as
> >>>>>>>>>>>>>>>>>>>> it's a
> >>>>>>>>>>>>>>>>>>>>>>> backwards incompatible change. My concern is
> >> that,
> >>>> if
> >>>>>> we
> >>>>>>>>>> leave
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>>>>> "flush" metrics in place, they will be
> >> confusing to
> >>>>>> users.
> >>>>>>>>>>>>>>>> Right
> >>>>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>>>>>> "flush" metrics record explicit flushes to
> >> disk,
> >>>> but
> >>>>>> under
> >>>>>>>>>>>>>>>> KIP-892,
> >>>>>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>> commit() will not explicitly flush data to
> >> disk -
> >>>>>> RocksDB
> >>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> decide
> >>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> when to flush memtables to disk itself.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> If we keep the existing "flush" metrics, we'd
> >> have
> >>>> two
> >>>>>>>>>>>> options,
> >>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>>>>>> seem pretty bad to me:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>          1. Have them record calls to commit(),
> >>>> which
> >>>>>> would
> >>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> misleading, as
> >>>>>>>>>>>>>>>>>>>>>>>          data is no longer explicitly "flushed"
> >> to
> >>>> disk
> >>>>>> by
> >>>>>>>> this
> >>>>>>>>>>>>>>>> call.
> >>>>>>>>>>>>>>>>>>>>>>>          2. Have them record nothing at all,
> >> which
> >>>> is
> >>>>>>>>>> equivalent
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> removing
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>          metrics, except that users will see the
> >>>> metric
> >>>>>>>> still
> >>>>>>>>>>>>>>>> exists and
> >>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>> assume
> >>>>>>>>>>>>>>>>>>>>>>>          that the metric is correct, and that
> >>>> there's a
> >>>>>>>> problem
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>> system
> >>>>>>>>>>>>>>>>>>>>>>>          when there isn't.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I agree that removing them is also a bad
> >> solution,
> >>>> and
> >>>>>> I'd
> >>>>>>>>>>>>>>>> like some
> >>>>>>>>>>>>>>>>>>>>>>> guidance on the best path forward here.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 5.
> >>>>>>>>>>>>>>>>>>>>>>> Position files are updated on every write to a
> >>>>>> StateStore.
> >>>>>>>>>>>>>>>> Since our
> >>>>>>>>>>>>>>>>>>>>>> writes
> >>>>>>>>>>>>>>>>>>>>>>> are now buffered until commit(), we can't
> >> update
> >>>> the
> >>>>>>>> Position
> >>>>>>>>>>>>>>>> file
> >>>>>>>>>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>>>>>>>>> commit() has been called, otherwise it would be
> >>>>>>>> inconsistent
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>>>>>>> in the event of a rollback. Consequently, we
> >> need
> >>>> to
> >>>>>> manage
> >>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>> offsets
> >>>>>>>>>>>>>>>>>>>>>>> the same way we manage the checkpoint offsets,
> >> and
> >>>>>> ensure
> >>>>>>>>>>>>>>>> they're
> >>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>> written on commit().
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 6.
> >>>>>>>>>>>>>>>>>>>>>>> Agreed, although I'm not exactly sure yet what
> >>>> tests to
> >>>>>>>>>> write.
> >>>>>>>>>>>>>>>> How
> >>>>>>>>>>>>>>>>>>>>>> explicit
> >>>>>>>>>>>>>>>>>>>>>>> do we need to be here in the KIP?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> As for upgrade/downgrade: upgrade is designed
> >> to be
> >>>>>>>> seamless,
> >>>>>>>>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>> definitely add some tests around that.
> >> Downgrade,
> >>>> it
> >>>>>>>>>>>>>>>> transpires,
> >>>>>>>>>>>>>>>>>> isn't
> >>>>>>>>>>>>>>>>>>>>>>> currently possible, as the extra column family
> >> for
> >>>>>> offset
> >>>>>>>>>>>>>>>> storage is
> >>>>>>>>>>>>>>>>>>>>>>> incompatible with the pre-KIP-892
> >> implementation:
> >>>> when
> >>>>>> you
> >>>>>>>>>>>>>>>> open a
> >>>>>>>>>>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>>>>>>>>>> database, you must open all available column
> >>>> families
> >>>>>> or
> >>>>>>>>>>>>>>>> receive an
> >>>>>>>>>>>>>>>>>>>>>> error.
> >>>>>>>>>>>>>>>>>>>>>>> What currently happens on downgrade is that it
> >>>>>> attempts to
> >>>>>>>>>>>>>>>> open the
> >>>>>>>>>>>>>>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>>>>>> throws an error about the offsets column
> >> family not
> >>>>>> being
> >>>>>>>>>>>>>>>> opened,
> >>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>> triggers a wipe and rebuild of the Task. Given
> >> that
> >>>>>>>>>> downgrades
> >>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>> uncommon, I think this is acceptable
> >> behaviour, as
> >>>> the
> >>>>>>>>>>>>>>>> end-state is
> >>>>>>>>>>>>>>>>>>>>>>> consistent, even if it results in an
> >> undesirable
> >>>> state
> >>>>>>>>>>>> restore.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Should I document the upgrade/downgrade
> >> behaviour
> >>>>>>>> explicitly
> >>>>>>>>>>>>>>>> in the
> >>>>>>>>>>>>>>>>>>>> KIP?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
> >>>>>>>>>>>>>>>> cado...@apache.org>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Nick!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the updates!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>>>>>> Why does StateStore#flush() default to
> >>>>>>>>>>>>>>>>>>>>>>>> StateStore#commit(Collections.emptyMap())?
> >>>>>>>>>>>>>>>>>>>>>>>> Since calls to flush() will not exist anymore
> >>>> after
> >>>>>> this
> >>>>>>>> KIP
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> released, I would rather throw an unsupported
> >>>>>> operation
> >>>>>>>>>>>>>>>> exception
> >>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>> default.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>>> When would a state store return -1 from
> >>>>>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes()
> >> while
> >>>>>> being
> >>>>>>>>>>>>>>>>>>>> transactional?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Wouldn't
> >>>> StateStore#approximateNumUncommittedBytes()
> >>>>>> also
> >>>>>>>>>>>>>>>> return 0
> >>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>>>> the state store is transactional but nothing
> >> has
> >>>> been
> >>>>>>>>>> written
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> state store yet?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>> Sorry for bringing this up again. Does this
> >> KIP
> >>>> really
> >>>>>>>> need
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext#isolationLevel()?
> >>>> StateStoreContext
> >>>>>> has
> >>>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>>>>>>>>>> appConfigs() which basically exposes the same
> >>>>>> information,
> >>>>>>>>>>>>>>>> i.e., if
> >>>>>>>>>>>>>>>>>>>> EOS
> >>>>>>>>>>>>>>>>>>>>>>>> is enabled or not.
> >>>>>>>>>>>>>>>>>>>>>>>> In one of your previous e-mails you wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> "My idea was to try to keep the StateStore
> >>>> interface
> >>>>>> as
> >>>>>>>>>>>>>>>> loosely
> >>>>>>>>>>>>>>>>>>>> coupled
> >>>>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> >>>>>> implementers
> >>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>> freedom,
> >>>>>>>>>>>>>>>>>>>>>>>> and reduce the amount of internal knowledge
> >>>> required."
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> While I understand the intent, I doubt that it
> >>>>>> decreases
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> coupling of
> >>>>>>>>>>>>>>>>>>>>>>>> a StateStore interface and the Streams engine.
> >>>>>>>>>> READ_COMMITTED
> >>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>> applies to IQ but not to reads by processors.
> >>>> Thus,
> >>>>>>>>>>>>>>>> implementers
> >>>>>>>>>>>>>>>>>>>> need to
> >>>>>>>>>>>>>>>>>>>>>>>> understand how Streams accesses the state
> >> stores.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I would like to hear what others think about
> >> this.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 4.
> >>>>>>>>>>>>>>>>>>>>>>>> Great exposing new metrics for transactional
> >> state
> >>>>>> stores!
> >>>>>>>>>>>>>>>>>> However, I
> >>>>>>>>>>>>>>>>>>>>>>>> would prefer to add new metrics and deprecate
> >> (in
> >>>> the
> >>>>>>>> docs)
> >>>>>>>>>>>>>>>> the old
> >>>>>>>>>>>>>>>>>>>>>>>> ones. You can find examples of deprecated
> >> metrics
> >>>>>> here:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>> https://kafka.apache.org/documentation/#selector_monitoring
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 5.
> >>>>>>>>>>>>>>>>>>>>>>>> Why does the KIP mention position files? I do
> >> not
> >>>>>> think
> >>>>>>>> they
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> related
> >>>>>>>>>>>>>>>>>>>>>>>> to transactions or flushes.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 6.
> >>>>>>>>>>>>>>>>>>>>>>>> I think we will also need to adapt/add
> >> integration
> >>>>>> tests
> >>>>>>>>>>>>>>>> besides
> >>>>>>>>>>>>>>>>>> unit
> >>>>>>>>>>>>>>>>>>>>>>>> tests. Additionally, we probably need
> >> integration
> >>>> or
> >>>>>>>> system
> >>>>>>>>>>>>>>>> tests
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> verify that upgrades and downgrades between
> >>>>>> transactional
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> non-transactional state stores work as
> >> expected.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>> One more thing: I noted John's suggestion in
> >> the
> >>>> KIP,
> >>>>>>>> under
> >>>>>>>>>>>>>>>>>>>> "Rejected
> >>>>>>>>>>>>>>>>>>>>>>>>> Alternatives". I still think it's an idea
> >> worth
> >>>>>> pursuing,
> >>>>>>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>>>>> believe
> >>>>>>>>>>>>>>>>>>>>>>>>> that it's out of the scope of this KIP,
> >> because
> >>>> it
> >>>>>>>> solves a
> >>>>>>>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>>>>> set
> >>>>>>>>>>>>>>>>>>>>>>>>> of problems to this KIP, and the scope of
> >> this
> >>>> one
> >>>>>> has
> >>>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>>>> grown
> >>>>>>>>>>>>>>>>>>>>>>>> quite
> >>>>>>>>>>>>>>>>>>>>>>>>> large!
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <
> >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I've updated the KIP (
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>>>>>>>>>>> )
> >>>>>>>>>>>>>>>>>>>>>>>>>> with the latest changes; mostly bringing
> >> back
> >>>>>> "Atomic
> >>>>>>>>>>>>>>>>>>>> Checkpointing"
> >>>>>>>>>>>>>>>>>>>>>>>> (for
> >>>>>>>>>>>>>>>>>>>>>>>>>> what feels like the 10th time!). I think
> >> the one
> >>>>>> thing
> >>>>>>>>>>>>>>>> missing is
> >>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>> changes to metrics (notably the store
> >> "flush"
> >>>>>> metrics
> >>>>>>>> will
> >>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>> renamed to "commit").
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> The reason I brought back Atomic
> >> Checkpointing
> >>>> was
> >>>>>> to
> >>>>>>>>>>>>>>>> decouple
> >>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>>>>>>>>>>>> from store commit. This is important,
> >> because
> >>>> with
> >>>>>>>>>>>>>>>> Transactional
> >>>>>>>>>>>>>>>>>>>>>>>>>> StateStores, we now need to call "flush" on
> >>>> *every*
> >>>>>> Task
> >>>>>>>>>>>>>>>> commit,
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>> just when the StateStore is closing,
> >> otherwise
> >>>> our
> >>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>> buffer
> >>>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>>> never be written and persisted, instead
> >> growing
> >>>>>>>> unbounded!
> >>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> experimented
> >>>>>>>>>>>>>>>>>>>>>>>>>> with some simple solutions, like forcing a
> >> store
> >>>>>> flush
> >>>>>>>>>>>>>>>> whenever
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer was likely to exceed its
> >>>>>> configured
> >>>>>>>>>>>>>>>> size, but
> >>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>>>>>>>> brittle: it prevented the transaction buffer
> >>>> from
> >>>>>> being
> >>>>>>>>>>>>>>>>>> configured
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>> unbounded, and it still would have required
> >>>> explicit
> >>>>>>>>>>>>>>>> flushes of
> >>>>>>>>>>>>>>>>>>>>>> RocksDB,
> >>>>>>>>>>>>>>>>>>>>>>>>>> yielding sub-optimal performance and memory
> >>>>>> utilization.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I deemed Atomic Checkpointing to be the
> >> "right"
> >>>> way
> >>>>>> to
> >>>>>>>>>>>>>>>> resolve
> >>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>> problem. By ensuring that the changelog
> >> offsets
> >>>> that
> >>>>>>>>>>>>>>>> correspond
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>>>>>>>>>>>> recently written records are always
> >> atomically
> >>>>>> written
> >>>>>>>> to
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> StateStore
> >>>>>>>>>>>>>>>>>>>>>>>>>> (by writing them to the same transaction
> >>>> buffer),
> >>>>>> we can
> >>>>>>>>>>>>>>>> avoid
> >>>>>>>>>>>>>>>>>>>>>> forcibly
> >>>>>>>>>>>>>>>>>>>>>>>>>> flushing the RocksDB memtables to disk,
> >> letting
> >>>>>> RocksDB
> >>>>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>>>> when necessary, without losing any of our
> >>>>>> consistency
> >>>>>>>>>>>>>>>> guarantees.
> >>>>>>>>>>>>>>>>>>>> See
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> updated KIP for more info.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I have fully implemented these changes,
> >>>> although I'm
> >>>>>>>> still
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> entirely
> >>>>>>>>>>>>>>>>>>>>>>>>>> happy with the implementation for segmented
> >>>>>> StateStores,
> >>>>>>>>>> so
> >>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>> plan
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>> refactor that. Despite that, all tests
> >> pass. If
> >>>>>> you'd
> >>>>>>>> like
> >>>>>>>>>>>>>>>> to try
> >>>>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>>>> review this highly experimental and
> >> incomplete
> >>>>>> branch,
> >>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>> available
> >>>>>>>>>>>>>>>>>>>>>>>> here:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0
> >>>>>>>> .
> >>>>>>>>>>>>>>>> Note:
> >>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>>> built
> >>>>>>>>>>>>>>>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable
> >> base
> >>>> to
> >>>>>> build
> >>>>>>>>>>>>>>>> and test
> >>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> on,
> >>>>>>>>>>>>>>>>>>>>>>>>>> and to enable easy apples-to-apples
> >> comparisons
> >>>> in a
> >>>>>>>> live
> >>>>>>>>>>>>>>>>>>>>>> environment. I
> >>>>>>>>>>>>>>>>>>>>>>>>>> plan to rebase it against trunk once it's
> >> nearer
> >>>>>>>>>> completion
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>>>>>>> proven on our main application.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I would really appreciate help in reviewing
> >> and
> >>>>>> testing:
> >>>>>>>>>>>>>>>>>>>>>>>>>> - Segmented (Versioned, Session and Window)
> >>>> stores
> >>>>>>>>>>>>>>>>>>>>>>>>>> - Global stores
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> As I do not currently use either of these,
> >> so my
> >>>>>> primary
> >>>>>>>>>>>>>>>> test
> >>>>>>>>>>>>>>>>>>>>>>>> environment
> >>>>>>>>>>>>>>>>>>>>>>>>>> doesn't test these areas.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I'm going on Parental Leave starting next
> >> week
> >>>> for
> >>>>>> a few
> >>>>>>>>>>>>>>>> weeks,
> >>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>>> not have time to move this forward until
> >> late
> >>>>>> August.
> >>>>>>>> That
> >>>>>>>>>>>>>>>> said,
> >>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>> feedback is welcome and appreciated, I just
> >>>> won't be
> >>>>>>>> able
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> respond
> >>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>>> quickly as usual.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <
> >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, that's correct, although the impact
> >> on IQ
> >>>> is
> >>>>>> not
> >>>>>>>>>>>>>>>> something
> >>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>> had
> >>>>>>>>>>>>>>>>>>>>>>>>>>> considered.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> What about atomically updating the state
> >> store
> >>>>>> from the
> >>>>>>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing
> >> the
> >>>>>>>> checkpoint
> >>>>>>>>>>>>>>>> (thus,
> >>>>>>>>>>>>>>>>>>>>>>>> flushing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of
> >> data
> >>>>>> and/or
> >>>>>>>>>>>>>>>> number of
> >>>>>>>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> intervals?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm not quite sure I follow. Are you
> >> suggesting
> >>>>>> that we
> >>>>>>>>>>>>>>>> add an
> >>>>>>>>>>>>>>>>>>>>>>>> additional
> >>>>>>>>>>>>>>>>>>>>>>>>>>> config for the max number of commit
> >> intervals
> >>>>>> between
> >>>>>>>>>>>>>>>>>> checkpoints?
> >>>>>>>>>>>>>>>>>>>>>> That
> >>>>>>>>>>>>>>>>>>>>>>>>>>> way, we would checkpoint *either* when the
> >>>>>> transaction
> >>>>>>>>>>>>>>>> buffers
> >>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>> nearly
> >>>>>>>>>>>>>>>>>>>>>>>>>>> full, *OR* whenever a certain number of
> >> commit
> >>>>>>>> intervals
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> elapsed,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> whichever comes first?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> That certainly seems reasonable, although
> >> this
> >>>>>>>> re-ignites
> >>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>> earlier
> >>>>>>>>>>>>>>>>>>>>>>>>>>> debate about whether a config should be
> >>>> measured in
> >>>>>>>>>>>>>>>> "number of
> >>>>>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>>>>>>>>> intervals", instead of just an absolute
> >> time.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> FWIW, I realised that this issue is the
> >> reason
> >>>> I
> >>>>>> was
> >>>>>>>>>>>>>>>> pursuing
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> Atomic
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Checkpoints, as it de-couples memtable
> >> flush
> >>>> from
> >>>>>>>>>>>>>>>> checkpointing,
> >>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>>>>>> enables us to just checkpoint on every
> >> commit
> >>>>>> without
> >>>>>>>> any
> >>>>>>>>>>>>>>>>>>>> performance
> >>>>>>>>>>>>>>>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely
> >> the
> >>>>>> "best"
> >>>>>>>>>>>>>>>> solution,
> >>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>>> I'm not
> >>>>>>>>>>>>>>>>>>>>>>>>>>> sure if this is enough to bring it back
> >> into
> >>>> this
> >>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm currently working on moving all the
> >>>>>> transactional
> >>>>>>>>>>>> logic
> >>>>>>>>>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>>>>>>> RocksDBStore itself, which does away with
> >> the
> >>>>>>>>>>>>>>>>>>>>>> StateStore#newTransaction
> >>>>>>>>>>>>>>>>>>>>>>>>>>> method, and reduces the number of new
> >> classes
> >>>>>>>> introduced,
> >>>>>>>>>>>>>>>>>>>>>> significantly
> >>>>>>>>>>>>>>>>>>>>>>>>>>> reducing the complexity. If it works, and
> >> the
> >>>>>>>> complexity
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> drastically
> >>>>>>>>>>>>>>>>>>>>>>>>>>> reduced, I may try bringing back Atomic
> >>>> Checkpoints
> >>>>>>>> into
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna
> >> <
> >>>>>>>>>>>>>>>> cado...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the insights! Very interesting!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> As far as I understand, you want to
> >> atomically
> >>>>>> update
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> from the transaction buffer, flush the
> >>>> memtable
> >>>>>> of a
> >>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> write the checkpoint not after the commit
> >> time
> >>>>>> elapsed
> >>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>> after
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer reached a size that
> >> would
> >>>> lead
> >>>>>> to
> >>>>>>>>>>>>>>>> exceeding
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> >>>> before the
> >>>>>>>> next
> >>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>>>>>> interval
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> ends.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> That means, the Kafka transaction would
> >> commit
> >>>>>> every
> >>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>> interval
> >>>>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store will only be atomically
> >>>> updated
> >>>>>>>> roughly
> >>>>>>>>>>>>>>>> every
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of
> >>>> data.
> >>>>>> Also
> >>>>>>>> IQ
> >>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> see new data roughly every
> >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> After a failure the state store needs to
> >>>> restore
> >>>>>> up to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Is this correct?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> What about atomically updating the state
> >> store
> >>>>>> from
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing
> >> the
> >>>>>>>> checkpoint
> >>>>>>>>>>>>>>>> (thus,
> >>>>>>>>>>>>>>>>>>>>>>>> flushing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of
> >> data
> >>>>>> and/or
> >>>>>>>>>>>>>>>> number of
> >>>>>>>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> intervals? In such a way, we would have
> >> the
> >>>> same
> >>>>>> delay
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> appearing in output topics and IQ because
> >> both
> >>>>>> would
> >>>>>>>>>>>>>>>> appear
> >>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka transaction is committed. However,
> >>>> after a
> >>>>>>>> failure
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> still needs to restore up to
> >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> it might restore data that is already in
> >> the
> >>>> state
> >>>>>>>> store
> >>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint lags behind the last stable
> >> offset
> >>>>>> (i.e.
> >>>>>>>> the
> >>>>>>>>>>>>>>>> last
> >>>>>>>>>>>>>>>>>>>>>> committed
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> offset) of the changelog topics. Restoring
> >>>> data
> >>>>>> that
> >>>>>>>> is
> >>>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> state store is idempotent, so eos should
> >> not
> >>>>>> violated.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> This solution needs at least one new
> >> config to
> >>>>>> specify
> >>>>>>>>>>>>>>>> when a
> >>>>>>>>>>>>>>>>>>>>>>>> checkpoint
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> should be written.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> A small correction to your previous e-mail
> >>>> that
> >>>>>> does
> >>>>>>>> not
> >>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>>>>> anything
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> you said: Under alos the default commit
> >>>> interval
> >>>>>> is 30
> >>>>>>>>>>>>>>>> seconds,
> >>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>> five
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> seconds.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've begun performance testing my branch
> >> on
> >>>> our
> >>>>>>>> staging
> >>>>>>>>>>>>>>>>>>>>>> environment,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting it through its paces in our
> >>>> non-trivial
> >>>>>>>>>>>>>>>> application.
> >>>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> observing the same increased flush rate
> >> that
> >>>> we
> >>>>>> saw
> >>>>>>>> the
> >>>>>>>>>>>>>>>> last
> >>>>>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> attempted to use a version of this KIP,
> >> but
> >>>> this
> >>>>>>>> time,
> >>>>>>>>>> I
> >>>>>>>>>>>>>>>>>> think I
> >>>>>>>>>>>>>>>>>>>>>> know
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> why.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit,
> >> which is
> >>>>>> called
> >>>>>>>> at
> >>>>>>>>>>>>>>>> the end
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Task
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> commit process, has the following
> >> behaviour:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - Under ALOS: checkpoint the
> >> state
> >>>>>> stores.
> >>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> includes
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            flushing memtables in RocksDB.
> >>>> This is
> >>>>>>>>>>>> acceptable
> >>>>>>>>>>>>>>>>>>>> because the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            commit.interval.ms is 5
> >> seconds,
> >>>> so
> >>>>>>>> forcibly
> >>>>>>>>>>>>>>>> flushing
> >>>>>>>>>>>>>>>>>>>>>> memtables
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> every 5
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            seconds is acceptable for most
> >>>>>>>> applications.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - Under EOS: checkpointing is
> >> not
> >>>> done,
> >>>>>>>>>> *unless*
> >>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> forced, due
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            to e.g. the Task closing or
> >> being
> >>>>>> revoked.
> >>>>>>>>>> This
> >>>>>>>>>>>>>>>> means
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> under
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> normal
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            processing conditions, the
> >> state
> >>>> stores
> >>>>>>>> will
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> checkpointed,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> and will
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            not have memtables flushed at
> >> all ,
> >>>>>> unless
> >>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>>>>>>> decides to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush them on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            its own. Checkpointing stores
> >> and
> >>>>>>>>>> force-flushing
> >>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>> memtables
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> is only
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            done when a Task is being
> >> closed.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint
> >>>> stores on
> >>>>>> at
> >>>>>>>>>>>> least
> >>>>>>>>>>>>>>>>>> *some*
> >>>>>>>>>>>>>>>>>>>>>>>> normal
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Task commits, in order to write the
> >> RocksDB
> >>>>>>>> transaction
> >>>>>>>>>>>>>>>>>> buffers
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state stores, and to ensure the offsets
> >> are
> >>>>>> synced to
> >>>>>>>>>>>>>>>> disk to
> >>>>>>>>>>>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> restores from getting out of hand.
> >>>> Consequently,
> >>>>>> my
> >>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task
> >> commit,
> >>>>>> which
> >>>>>>>> is
> >>>>>>>>>>>>>>>> far too
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> frequent.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This causes checkpoints every 10,000
> >> records,
> >>>>>> which
> >>>>>>>> is
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, potentially causing
> >> performance
> >>>>>> problems
> >>>>>>>> for
> >>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> applications.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm looking into possible solutions, and
> >> I'm
> >>>>>>>> currently
> >>>>>>>>>>>>>>>> leaning
> >>>>>>>>>>>>>>>>>>>>>>>> towards
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> using the
> >>>> statestore.transaction.buffer.max.bytes
> >>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to
> >>>> exceed it.
> >>>>>>>> This
> >>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> complement the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing "early Task commit"
> >> functionality
> >>>> that
> >>>>>> this
> >>>>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides, in the following way:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - Currently, we use
> >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> force an
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            early Task commit if processing
> >>>> more
> >>>>>>>> records
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>> cause
> >>>>>>>>>>>>>>>>>>>> our
> >>>>>>>>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            transactions to exceed the
> >> memory
> >>>>>> assigned
> >>>>>>>> to
> >>>>>>>>>>>>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - New functionality: when a
> >> Task
> >>>> *does*
> >>>>>>>>>> commit,
> >>>>>>>>>>>>>>>> we will
> >>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            the stores (and hence flush the
> >>>>>> transaction
> >>>>>>>>>>>>>>>> buffers)
> >>>>>>>>>>>>>>>>>>>> unless
> >>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> expect to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            cross the
> >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> >>>>>>>>>>>>>>>>>>>> threshold
> >>>>>>>>>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the next
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            commit
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm also open to suggestions.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick
> >> Telford <
> >>>>>>>>>>>>>>>>>>>> nick.telf...@gmail.com
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> By "less predictable for users", I
> >> meant in
> >>>>>> terms of
> >>>>>>>>>>>>>>>>>>>> understanding
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance profile under various
> >>>>>> circumstances. The
> >>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>> complex
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution, the more difficult it would
> >> be for
> >>>>>> users
> >>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance they see. For example,
> >> spilling
> >>>>>> records
> >>>>>>>> to
> >>>>>>>>>>>>>>>> disk
> >>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer reaches a threshold
> >>>> would, I
> >>>>>>>>>> expect,
> >>>>>>>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>>>>>>>>>> write
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> throughput. This reduction in write
> >>>> throughput
> >>>>>> could
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> unexpected,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> potentially difficult to
> >>>> diagnose/understand for
> >>>>>>>>>> users.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the moment, I think the "early
> >> commit"
> >>>>>> concept is
> >>>>>>>>>>>>>>>>>> relatively
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> straightforward; it's easy to document,
> >> and
> >>>>>>>>>>>> conceptually
> >>>>>>>>>>>>>>>>>> fairly
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> obvious to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. We could probably add a metric to
> >>>> make it
> >>>>>>>>>> easier
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it happens though.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. (the second one)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an
> >>>> indirect
> >>>>>> way
> >>>>>>>> of
> >>>>>>>>>>>>>>>>>> telling
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> StateStores
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> whether they should be transactional.
> >>>>>> READ_COMMITTED
> >>>>>>>>>>>>>>>>>>>> essentially
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> requires
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions, because it dictates that
> >> two
> >>>>>> threads
> >>>>>>>>>>>>>>>> calling
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `newTransaction()` should not see writes
> >>>> from
> >>>>>> the
> >>>>>>>>>> other
> >>>>>>>>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> they have been committed. With
> >>>>>> READ_UNCOMMITTED, all
> >>>>>>>>>>>>>>>> bets are
> >>>>>>>>>>>>>>>>>>>> off,
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stores can allow threads to observe
> >> written
> >>>>>> records
> >>>>>>>> at
> >>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>> time,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> which is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> essentially "no transactions". That
> >> said,
> >>>>>>>> StateStores
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>> free
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> implement
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> these guarantees however they can,
> >> which is
> >>>> a
> >>>>>> bit
> >>>>>>>> more
> >>>>>>>>>>>>>>>>>> relaxed
> >>>>>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dictating "you must use transactions".
> >> For
> >>>>>> example,
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implement these as READ_COMMITTED ==
> >>>> WBWI-based
> >>>>>>>>>>>>>>>>>> "transactions",
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the
> >>>>>> database.
> >>>>>>>> But
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> storage
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> engines, it might be preferable to
> >> *always*
> >>>> use
> >>>>>>>>>>>>>>>> transactions,
> >>>>>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> unnecessary; or there may be storage
> >> engines
> >>>>>> that
> >>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions, but the isolation
> >> guarantees
> >>>> can
> >>>>>> be
> >>>>>>>> met
> >>>>>>>>>>>>>>>> using a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> technique.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My idea was to try to keep the
> >> StateStore
> >>>>>> interface
> >>>>>>>> as
> >>>>>>>>>>>>>>>>>> loosely
> >>>>>>>>>>>>>>>>>>>>>>>> coupled
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to
> >> give
> >>>>>>>>>>>>>>>> implementers
> >>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> freedom, and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reduce the amount of internal knowledge
> >>>>>> required.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> That said, I understand that
> >>>> "IsolationLevel"
> >>>>>> might
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> right
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> abstraction, and we can always make it
> >> much
> >>>> more
> >>>>>>>>>>>>>>>> explicit if
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> required, e.g.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean transactional()
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 7-8.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can make these changes either later
> >> today
> >>>> or
> >>>>>>>>>>>> tomorrow.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Small update:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've rebased my branch on trunk and
> >> fixed a
> >>>>>> bunch of
> >>>>>>>>>>>>>>>> issues
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> needed
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addressing. Currently, all the tests
> >> pass,
> >>>>>> which is
> >>>>>>>>>>>>>>>>>> promising,
> >>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to undergo some performance
> >> testing. I
> >>>>>> haven't
> >>>>>>>>>>>>>>>> (yet)
> >>>>>>>>>>>>>>>>>>>> worked
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff,
> >> but I
> >>>>>> would
> >>>>>>>>>>>>>>>> expect
> >>>>>>>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviourally, it should make no
> >>>> difference. The
> >>>>>>>>>> branch
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> available
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c
> >>>>>>>>>> if
> >>>>>>>>>>>>>>>>>> anyone
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interested in taking an early look.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno
> >> Cadonna
> >>>> <
> >>>>>>>>>>>>>>>>>>>> cado...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was
> >> actually
> >>>> also
> >>>>>> my
> >>>>>>>>>>>>>>>> point. I
> >>>>>>>>>>>>>>>>>>>>>>>> understood
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that John was proposing the ingestion
> >> path
> >>>> as
> >>>>>> a way
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> avoid
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> early
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the
> >>>> intent.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with John here, that actually
> >> it is
> >>>>>> public
> >>>>>>>>>>>>>>>> API. My
> >>>>>>>>>>>>>>>>>>>>>> question
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how this usage pattern affects normal
> >>>>>> processing.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My concern is that checking for the
> >> size
> >>>> of the
> >>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>>>> buffer
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> maybe triggering an early commit
> >> affects
> >>>> the
> >>>>>> whole
> >>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams. The transactionality of a
> >> state
> >>>> store
> >>>>>> is
> >>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> confined to
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state store itself, but spills over and
> >>>>>> changes the
> >>>>>>>>>>>>>>>> behavior
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parts of the system. I agree with you
> >> that
> >>>> it
> >>>>>> is a
> >>>>>>>>>>>>>>>> decent
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> compromise. I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just wanted to analyse the downsides
> >> and
> >>>> list
> >>>>>> the
> >>>>>>>>>>>>>>>> options to
> >>>>>>>>>>>>>>>>>>>>>>>> overcome
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> them. I also agree with you that all
> >>>> options
> >>>>>> seem
> >>>>>>>>>>>> quite
> >>>>>>>>>>>>>>>>>> heavy
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> compared
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with your KIP. I do not understand
> >> what you
> >>>>>> mean
> >>>>>>>> with
> >>>>>>>>>>>>>>>> "less
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> predictable
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for users", though.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I found the discussions about the
> >>>> alternatives
> >>>>>>>> really
> >>>>>>>>>>>>>>>>>>>>>> interesting.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> But I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also think that your plan sounds good
> >> and
> >>>> we
> >>>>>> should
> >>>>>>>>>>>>>>>> continue
> >>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>> it!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Some comments on your reply to my
> >> e-mail on
> >>>>>> June
> >>>>>>>>>> 20th:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning
> >> behind
> >>>>>> putting
> >>>>>>>>>>>>>>>> isolation
> >>>>>>>>>>>>>>>>>>>>>> level
> >>>>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store context. Thanks! Should
> >>>> that
> >>>>>> also
> >>>>>>>> be
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> give
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store the opportunity to
> >> decide
> >>>>>> whether
> >>>>>>>> to
> >>>>>>>>>>>>>>>> turn on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions or not?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> With my comment, I was more concerned
> >> about
> >>>>>> how do
> >>>>>>>>>> you
> >>>>>>>>>>>>>>>> know
> >>>>>>>>>>>>>>>>>>>> if a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint file needs to be written
> >> under
> >>>> EOS,
> >>>>>> if
> >>>>>>>> you
> >>>>>>>>>>>>>>>> do not
> >>>>>>>>>>>>>>>>>>>>>> have a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to know if the state store is
> >>>> transactional or
> >>>>>> not.
> >>>>>>>>>> If
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional, the checkpoint file can
> >> be
> >>>>>> written
> >>>>>>>>>>>>>>>> during
> >>>>>>>>>>>>>>>>>>>> normal
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processing under EOS. If the state
> >> store
> >>>> is not
> >>>>>>>>>>>>>>>>>> transactional,
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint file must not be written
> >> under
> >>>> EOS.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 7.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My point was about not only
> >> considering the
> >>>>>> bytes
> >>>>>>>> in
> >>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> config
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but
> >> also
> >>>>>> bytes
> >>>>>>>> that
> >>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> spilled
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering
> >>>> whether you
> >>>>>>>>>> should
> >>>>>>>>>>>>>>>>>> remove
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory
> >>>> bytes to
> >>>>>> be
> >>>>>>>>>> used
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer uncommitted state-store
> >> records." My
> >>>>>>>> thinking
> >>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> if a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to
> >>>> disk,
> >>>>>>>>>> limiting
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> overall
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> bytes
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might make sense. Thinking about it
> >> again
> >>>> and
> >>>>>>>>>>>>>>>> considering
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> recent
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions, it does not make too much
> >>>> sense
> >>>>>>>> anymore.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the name
> >>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes that
> >>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 8.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A high-level description (without
> >>>>>> implementation
> >>>>>>>>>>>>>>>> details) of
> >>>>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams will manage the commit of
> >> changelog
> >>>>>>>>>>>>>>>> transactions,
> >>>>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions and checkpointing would be
> >>>> great.
> >>>>>>>> Would
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> great
> >>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could also add some sentences about the
> >>>>>> behavior in
> >>>>>>>>>>>>>>>> case of
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> failure.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For instance how does a transactional
> >> state
> >>>>>> store
> >>>>>>>>>>>>>>>> recover
> >>>>>>>>>>>>>>>>>>>> after a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> failure or what happens with the
> >>>> transaction
> >>>>>>>> buffer,
> >>>>>>>>>>>>>>>> etc.
> >>>>>>>>>>>>>>>>>>>> (that
> >>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that
> >>>>>>>>>>>>>>>> WriteBatchWithIndex
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to
> >>>> exceed)
> >>>>>>>>>>>>>>>> configured
> >>>>>>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> needs to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger an early commit?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is one of my big concerns.
> >>>> Ultimately,
> >>>>>> any
> >>>>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cracking
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in
> >>>> ways
> >>>>>> it's
> >>>>>>>> not
> >>>>>>>>>>>>>>>> really
> >>>>>>>>>>>>>>>>>>>>>>>> designed
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> likely to have some unforseen
> >> performance
> >>>> or
> >>>>>>>>>>>>>>>> consistency
> >>>>>>>>>>>>>>>>>>>> issues.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's your motivation for removing
> >> these
> >>>>>> early
> >>>>>>>>>>>>>>>> commits?
> >>>>>>>>>>>>>>>>>>>> While
> >>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ideal, I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think they're a decent compromise to
> >>>> ensure
> >>>>>>>>>>>>>>>> consistency
> >>>>>>>>>>>>>>>>>>>> whilst
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> maintaining
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> good and predictable performance.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem
> >> *very*
> >>>>>>>>>>>>>>>> complicated, and
> >>>>>>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make behaviour less predictable for
> >> users
> >>>> as a
> >>>>>>>>>>>>>>>> consequence.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of
> >> this
> >>>>>> KIP is
> >>>>>>>>>>>>>>>> growing a
> >>>>>>>>>>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> control. While it's good to discuss
> >> ideas
> >>>> for
> >>>>>>>> future
> >>>>>>>>>>>>>>>>>>>>>>>> improvements, I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
> >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's important to narrow the scope
> >> down
> >>>> to a
> >>>>>>>> design
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> achieves
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pressing objectives (constant sized
> >>>>>> restorations
> >>>>>>>>>>>>>>>> during
> >>>>>>>>>>>>>>>>>> dirty
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design
> >> that
> >>>>>> this KIP
> >>>>>>>>>>>>>>>> produces
> >>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ultimately
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be changed in the future, especially
> >> if
> >>>> the
> >>>>>> bulk
> >>>>>>>> of
> >>>>>>>>>>>>>>>> it is
> >>>>>>>>>>>>>>>>>>>>>> internal
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm going to spend some time next week
> >>>> trying
> >>>>>> to
> >>>>>>>>>>>>>>>> re-work
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove
> >> the
> >>>>>>>>>>>>>>>> newTransaction()
> >>>>>>>>>>>>>>>>>>>>>> method,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just an implementation detail of
> >>>>>>>> RocksDBStore.
> >>>>>>>>>>>>>>>> That
> >>>>>>>>>>>>>>>>>>>> way, if
> >>>>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replace WBWI with something in the
> >> future,
> >>>>>> like
> >>>>>>>> the
> >>>>>>>>>>>>>>>> SST
> >>>>>>>>>>>>>>>>>> file
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> management
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outlined by John, then we can do so
> >> with
> >>>>>> little/no
> >>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>> changes.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>
> >

Reply via email to