Here's what I'm thinking: based on Bruno's earlier feedback, I'm going to try to simplify my original design down such that it needs no/minimal changes to the public interface.
If that succeeds, then it should also be possible to transparently implement the "no memtables" solution as a performance optimization when the record cache is enabled. I consider this approach only an optimisation, because of the need to still support stores with the cache disabled. For that reason, I think the "no memtables" approach would probably best be suited as a follow-up KIP, but that we keep it in mind during the design of this one. What do you think? Regards, Nick On Tue, 20 Jun 2023, 22:26 John Roesler, <vvcep...@apache.org> wrote: > Oh, that's a good point. > > On the topic of a behavioral switch for disabled caches, the typical use > case for disabling the cache is to cause each individual update to > propagate down the topology, so another thought might be to just go > ahead and add the memory we would have used for the memtables to the > cache size, but if people did disable the cache entirely, then we could > still go ahead and forward the records on each write? > > I know that Guozhang was also proposing for a while to actually decouple > caching and forwarding, which might provide a way to side-step this > dilemma (i.e., we just always forward and only apply the cache to state > and changelog writes). > > By the way, I'm basing my statement on why you'd disable caches on > memory, but also on the guidance here: > > https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html > . That doc also contains a section on how to bound the total memory > usage across RocksDB memtables, which points to another benefit of > disabling memtables and managing the write buffer ourselves (simplified > memory configuration). > > Thanks, > -John > > On 6/20/23 16:05, Nick Telford wrote: > > Potentially we could just go the memorable with Rocks WriteBatches route > if > > the cache is disabled? > > > > On Tue, 20 Jun 2023, 22:00 John Roesler, <j...@vvcephei.org> wrote: > > > >> Touché! > >> > >> Ok, I agree that figuring out the case of a disabled cache would be > >> non-trivial. Ingesting single-record SST files will probably not be > >> performant, but benchmarking may prove different. Or maybe we can have > >> some reserved cache space on top of the user-configured cache, which we > >> would have reclaimed from the memtable space. Or some other, more > >> creative solution. > >> > >> Thanks, > >> -John > >> > >> On 6/20/23 15:30, Nick Telford wrote: > >>>> Note that users can disable the cache, which would still be > >>> ok, I think. We wouldn't ingest the SST files on every record, but just > >>> append to them and only ingest them on commit, when we're already > >>> waiting for acks and a RocksDB commit. > >>> > >>> In this case, how would uncommitted records be read by joins? > >>> > >>> On Tue, 20 Jun 2023, 20:51 John Roesler, <vvcep...@apache.org> wrote: > >>> > >>>> Ah, sorry Nick, > >>>> > >>>> I just meant the regular heap based cache that we maintain in > Streams. I > >>>> see that it's not called "RecordCache" (my mistake). > >>>> > >>>> The actual cache is ThreadCache: > >>>> > >>>> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java > >>>> > >>>> Here's the example of how we use the cache in KeyValueStore: > >>>> > >>>> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java > >>>> > >>>> It's basically just an on-heap Map of records that have not yet been > >>>> written to the changelog or flushed into the underlying store. It gets > >>>> flushed when the total cache size exceeds `cache.max.bytes.buffering` > or > >>>> the `commit.interval.ms` elapses. > >>>> > >>>> Speaking of those configs, another benefit to this idea is that we > would > >>>> no longer need to trigger extra commits based on the size of the > ongoing > >>>> transaction. Instead, we'd just preserve the existing cache-flush > >>>> behavior. Note that users can disable the cache, which would still be > >>>> ok, I think. We wouldn't ingest the SST files on every record, but > just > >>>> append to them and only ingest them on commit, when we're already > >>>> waiting for acks and a RocksDB commit. > >>>> > >>>> Thanks, > >>>> -John > >>>> > >>>> On 6/20/23 14:09, Nick Telford wrote: > >>>>> Hi John, > >>>>> > >>>>> By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find > >> any > >>>>> class called "RecordCache"... > >>>>> > >>>>> Cheers, > >>>>> > >>>>> Nick > >>>>> > >>>>> On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org> > >> wrote: > >>>>> > >>>>>> Hi Nick, > >>>>>> > >>>>>> Thanks for picking this up again! > >>>>>> > >>>>>> I did have one new thought over the intervening months, which I'd > like > >>>>>> your take on. > >>>>>> > >>>>>> What if, instead of using the RocksDB atomic write primitive at all, > >> we > >>>>>> instead just: > >>>>>> 1. disable memtables entirely > >>>>>> 2. directly write the RecordCache into SST files when we flush > >>>>>> 3. atomically ingest the SST file(s) into RocksDB when we get the > ACK > >>>>>> from the changelog (see > >>>>>> > >>>>>> > >>>> > >> > https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md > >>>>>> and > >>>>>> > >>>>>> > >>>> > >> > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java > >>>>>> and > >>>>>> > >>>>>> > >>>> > >> > https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429 > >>>>>> ) > >>>>>> 4. track the changelog offsets either in another CF or the same CF > >> with > >>>>>> a reserved key, either of which will make the changelog offset > update > >>>>>> atomic with the file ingestions > >>>>>> > >>>>>> I suspect this'll have a number of benefits: > >>>>>> * writes to RocksDB will always be atomic > >>>>>> * we don't fragment memory between the RecordCache and the memtables > >>>>>> * RecordCache gives far higher performance than memtable for reads > and > >>>>>> writes > >>>>>> * we don't need any new "transaction" concepts or memory bound > configs > >>>>>> > >>>>>> What do you think? > >>>>>> > >>>>>> Thanks, > >>>>>> -John > >>>>>> > >>>>>> On 6/20/23 10:51, Nick Telford wrote: > >>>>>>> Hi Bruno, > >>>>>>> > >>>>>>> Thanks for reviewing the KIP. It's been a long road, I started > >> working > >>>> on > >>>>>>> this more than a year ago, and most of the time in the last 6 > months > >>>> has > >>>>>>> been spent on the "Atomic Checkpointing" stuff that's been benched, > >> so > >>>>>> some > >>>>>>> of the reasoning behind some of my decisions have been lost, but > I'll > >>>> do > >>>>>> my > >>>>>>> best to reconstruct them. > >>>>>>> > >>>>>>> 1. > >>>>>>> IIRC, this was the initial approach I tried. I don't remember the > >> exact > >>>>>>> reasons I changed it to use a separate "view" of the StateStore > that > >>>>>>> encapsulates the transaction, but I believe it had something to do > >> with > >>>>>>> concurrent access to the StateStore from Interactive Query threads. > >>>> Reads > >>>>>>> from interactive queries need to be isolated from the currently > >> ongoing > >>>>>>> transaction, both for consistency (so interactive queries don't > >> observe > >>>>>>> changes that are subsequently rolled-back), but also to prevent > >>>> Iterators > >>>>>>> opened by an interactive query from being closed and invalidated by > >> the > >>>>>>> StreamThread when it commits the transaction, which causes your > >>>>>> interactive > >>>>>>> queries to crash. > >>>>>>> > >>>>>>> Another reason I believe I implemented it this way was a separation > >> of > >>>>>>> concerns. Recall that newTransaction() originally created an object > >> of > >>>>>> type > >>>>>>> Transaction, not StateStore. My intent was to improve the > type-safety > >>>> of > >>>>>>> the API, in an effort to ensure Transactions weren't used > >> incorrectly. > >>>>>>> Unfortunately, this didn't pan out, but newTransaction() remained. > >>>>>>> > >>>>>>> Finally, this had the added benefit that implementations could > easily > >>>> add > >>>>>>> support for transactions *without* re-writing their existing, > >>>>>>> non-transactional implementation. I think this can be a benefit > both > >>>> for > >>>>>>> implementers of custom StateStores, but also for anyone extending > >>>>>>> RocksDbStore, as they can rely on the existing access methods > working > >>>> how > >>>>>>> they expect them to. > >>>>>>> > >>>>>>> I'm not too happy with the way the current design has panned out, > so > >>>> I'm > >>>>>>> open to ideas on how to improve it. Key to this is finding some way > >> to > >>>>>>> ensure that reads from Interactive Query threads are properly > >> isolated > >>>>>> from > >>>>>>> the transaction, *without* the performance overhead of checking > which > >>>>>>> thread the method is being called from on every access. > >>>>>>> > >>>>>>> As for replacing flush() with commit() - I saw no reason to add > this > >>>>>>> complexity to the KIP, unless there was a need to add arguments to > >> the > >>>>>>> flush/commit method. This need arises with Atomic Checkpointing, > but > >>>> that > >>>>>>> will be implemented separately, in a future KIP. Do you see a need > >> for > >>>>>> some > >>>>>>> arguments to the flush/commit method that I've missed? Or were you > >>>> simply > >>>>>>> suggesting a rename? > >>>>>>> > >>>>>>> 2. > >>>>>>> This is simply due to the practical reason that isolationLevel() is > >>>>>> really > >>>>>>> a proxy for checking if the app is under EOS. The application > >>>>>> configuration > >>>>>>> is not provided to the constructor of StateStores, but it *is* > >> provided > >>>>>> to > >>>>>>> init(), via StateStoreContext. For this reason, it seemed somewhat > >>>>>> natural > >>>>>>> to add it to StateStoreContext. I think this makes sense, since the > >>>>>>> IsolationLevel of all StateStores in an application *must* be the > >> same, > >>>>>> and > >>>>>>> since those stores are all initialized with the same > >> StateStoreContext, > >>>>>> it > >>>>>>> seems natural for that context to carry the desired IsolationLevel > to > >>>>>> use. > >>>>>>> > >>>>>>> 3. > >>>>>>> Using IsolationLevel instead of just passing `boolean eosEnabled`, > >> like > >>>>>>> much of the internals was an attempt to logically de-couple the > >>>>>> StateStore > >>>>>>> API from the internals of Kafka Streams. Technically, StateStores > >> don't > >>>>>>> need to know/care what processing mode the KS app is using, all > they > >>>> need > >>>>>>> to know is the isolation level expected of them. > >>>>>>> > >>>>>>> Having formal definitions for the expectations of the two required > >>>>>>> IsolationLevels allow implementers to implement transactional > stores > >>>>>>> without having to dig through the internals of Kafka Streams and > >>>>>> understand > >>>>>>> exactly how they are used. The tight coupling between state stores > >> and > >>>>>>> internal behaviour has actually significantly hindered my progress > on > >>>>>> this > >>>>>>> KIP, and encouraged me to avoid increasing this logical coupling as > >>>> much > >>>>>> as > >>>>>>> possible. > >>>>>>> > >>>>>>> This also frees implementations to satisfy those requirements in > any > >>>> way > >>>>>>> they choose. Transactions might not be the only/available approach > to > >>>> an > >>>>>>> implementation, but they might have an alternative way to satisfy > the > >>>>>>> isolation requirements. I admit that this point is more about > >>>> semantics, > >>>>>>> but "transactional" would need to be formally defined in order for > >>>>>>> implementers to provide a valid implementation, and these > >>>> IsolationLevels > >>>>>>> provide that formal definition. > >>>>>>> > >>>>>>> 4. > >>>>>>> I can remove them. I added them only as I planned to include them > in > >>>> the > >>>>>>> org.apache.kafka.streams.state package, as a recommended base > >>>>>>> implementation for all StateStores, including those implemented by > >>>>>> users. I > >>>>>>> had assumed that anything in "public" packages, such as > >>>>>>> org.apache.kafka.streams.state, should be included in a KIP. Is > that > >>>>>> wrong? > >>>>>>> > >>>>>>> 5. > >>>>>>> RocksDB provides no way to measure the actual size of a > >>>>>>> WriteBatch(WithIndex), so we're limited to tracking the sum total > of > >>>> the > >>>>>>> size of keys + values that are written to the transaction. This > >>>> obviously > >>>>>>> under-estimates the actual memory usage, because WriteBatch > no-doubt > >>>>>>> includes some record overheads, and WriteBatchWithIndex has to > >> maintain > >>>>>> an > >>>>>>> index. > >>>>>>> > >>>>>>> Ideally, we could trivially add a method upstream to > >>>> WriteBatchInterface > >>>>>>> that provides the exact size of the batch, but that would require > an > >>>>>>> upgrade of RocksDB, which won't happen soon. So for the time being, > >>>> we're > >>>>>>> stuck with an approximation, so I felt that the new method should > >>>> reflect > >>>>>>> that. > >>>>>>> > >>>>>>> Would you prefer the new method name ignores this constraint and > that > >>>> we > >>>>>>> simply make the rocks measurement more accurate in the future? > >>>>>>> > >>>>>>> 6. > >>>>>>> Done > >>>>>>> > >>>>>>> 7. > >>>>>>> Very good point. The KIP already specifically calls out memory in > the > >>>>>>> documentation of the config: "Maximum number of memory bytes to be > >> used > >>>>>> to > >>>>>>> buffer uncommitted state-store records." - did you have something > >> else > >>>> in > >>>>>>> mind? > >>>>>>> > >>>>>>> Should we also make this clearer by renaming the config property > >>>> itself? > >>>>>>> Perhaps to something like statestore.transaction.buffer.max.bytes? > >>>>>>> > >>>>>>> 8. > >>>>>>> OK, I can remove this. The intent here was to describe how Streams > >>>> itself > >>>>>>> will manage transaction roll-over etc. Presumably that means we > also > >>>>>> don't > >>>>>>> need a description of how Streams will manage the commit of > changelog > >>>>>>> transactions, state store transactions and checkpointing? > >>>>>>> > >>>>>>> 9. > >>>>>>> What do you mean by fail-over? Do you mean failing over an Active > >> Task > >>>> to > >>>>>>> an instance already hosting a Standby Task? > >>>>>>> > >>>>>>> Thanks again and sorry for the essay of a response! > >>>>>>> > >>>>>>> Regards, > >>>>>>> Nick > >>>>>>> > >>>>>>> On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org> > >>>> wrote: > >>>>>>> > >>>>>>>> Hi Nick, > >>>>>>>> > >>>>>>>> Thanks for the updates! > >>>>>>>> > >>>>>>>> I really appreciate that you simplified the KIP by removing some > >>>>>>>> aspects. As I have already told you, I think the removed aspects > are > >>>>>>>> also good ideas and we can discuss them on follow-up KIPs. > >>>>>>>> > >>>>>>>> Regarding the current KIP, I have the following feedback. > >>>>>>>> > >>>>>>>> 1. > >>>>>>>> Is there a good reason to add method newTransaction() to the > >>>> StateStore > >>>>>>>> interface? As far as I understand, the idea is that users of a > state > >>>>>>>> store (transactional or not) call this method at start-up and > after > >>>> each > >>>>>>>> commit. Since the call to newTransaction() is done in any case > and I > >>>>>>>> think it would simplify the caller code if we just start a new > >>>>>>>> transaction after a commit in the implementation? > >>>>>>>> As far as I understand, you plan to commit the transaction in the > >>>>>>>> flush() method. I find the idea to replace flush() with commit() > >>>>>>>> presented in KIP-844 an elegant solution. > >>>>>>>> > >>>>>>>> 2. > >>>>>>>> Why is the method to query the isolation level added to the state > >>>> store > >>>>>>>> context? > >>>>>>>> > >>>>>>>> 3. > >>>>>>>> Do we need all the isolation level definitions? I think it is good > >> to > >>>>>>>> know the guarantees of the transactionality of the state store. > >>>>>>>> However, currently, Streams guarantees that there will only be one > >>>>>>>> transaction that writes to the state store. Only the stream thread > >>>> that > >>>>>>>> executes the active task that owns the state store will write to > the > >>>>>>>> state store. I think it should be enough to know if the state > store > >> is > >>>>>>>> transactional or not. So my proposal would be to just add a method > >> on > >>>>>>>> the state store interface the returns if a state store is > >>>> transactional > >>>>>>>> or not by returning a boolean or an enum. > >>>>>>>> > >>>>>>>> 4. > >>>>>>>> I am wondering why AbstractTransaction and > >> AbstractTransactionalStore > >>>>>>>> are part of the KIP. They look like implementation details that > >> should > >>>>>>>> not be exposed in the public API. > >>>>>>>> > >>>>>>>> 5. > >>>>>>>> Why does StateStore#approximateNumUncommittedBytes() return an > >>>>>>>> approximate number of bytes? > >>>>>>>> > >>>>>>>> 6. > >>>>>>>> RocksDB is just one implementation of the state stores in Streams. > >>>>>>>> However, the issues regarding OOM errors might also apply to other > >>>>>>>> custom implementations. So in the KIP I would extract that part > from > >>>>>>>> section "RocksDB Transaction". I would also move section "RocksDB > >>>>>>>> Transaction" to the end of section "Proposed Changes" and handle > it > >> as > >>>>>>>> an example implementation for a state store. > >>>>>>>> > >>>>>>>> 7. > >>>>>>>> Should statestore.uncommitted.max.bytes only limit the uncommitted > >>>> bytes > >>>>>>>> or the uncommitted bytes that reside in memory? In future, other > >>>>>>>> transactional state store implementations might implement a buffer > >> for > >>>>>>>> uncommitted records that are able to spill records on disk. I > think > >>>>>>>> statestore.uncommitted.max.bytes needs to limit the uncommitted > >> bytes > >>>>>>>> irrespective if they reside in memory or disk. Since Streams will > >> use > >>>>>>>> this config to decide if it needs to trigger a commit, state store > >>>>>>>> implementations that can spill to disk will never be able to spill > >> to > >>>>>>>> disk. You would only need to change the doc of the config, if you > >>>> agree > >>>>>>>> with me. > >>>>>>>> > >>>>>>>> 8. > >>>>>>>> Section "Transaction Management" about the wrappers is rather a > >>>>>>>> implementation detail that should not be in the KIP. > >>>>>>>> > >>>>>>>> 9. > >>>>>>>> Could you add a section that describes how failover will work with > >> the > >>>>>>>> transactional state stores? I think section "Error handling" is > >>>> already > >>>>>>>> a good start. > >>>>>>>> > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Bruno > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 15.05.23 11:04, Nick Telford wrote: > >>>>>>>>> Hi everyone, > >>>>>>>>> > >>>>>>>>> Quick update: I've added a new section to the KIP: "Offsets for > >>>>>> Consumer > >>>>>>>>> Rebalances", that outlines my solution to the problem that > >>>>>>>>> StreamsPartitionAssignor needs to read StateStore offsets even if > >>>>>> they're > >>>>>>>>> not currently open. > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Nick > >>>>>>>>> > >>>>>>>>> On Wed, 3 May 2023 at 11:34, Nick Telford < > nick.telf...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Bruno, > >>>>>>>>>> > >>>>>>>>>> Thanks for reviewing my proposal. > >>>>>>>>>> > >>>>>>>>>> 1. > >>>>>>>>>> The main reason I added it was because it was easy to do. If we > >> see > >>>> no > >>>>>>>>>> value in it, I can remove it. > >>>>>>>>>> > >>>>>>>>>> 2. > >>>>>>>>>> Global StateStores can have multiple partitions in their input > >>>> topics > >>>>>>>>>> (which function as their changelogs), so they would have more > than > >>>> one > >>>>>>>>>> partition. > >>>>>>>>>> > >>>>>>>>>> 3. > >>>>>>>>>> That's a good point. At present, the only method it adds is > >>>>>>>>>> isolationLevel(), which is likely not necessary outside of > >>>>>> StateStores. > >>>>>>>>>> It *does* provide slightly different guarantees in the > >> documentation > >>>>>> to > >>>>>>>>>> several of the methods (hence the overrides). I'm not sure if > this > >>>> is > >>>>>>>>>> enough to warrant a new interface though. > >>>>>>>>>> I think the question that remains is whether this interface > makes > >> it > >>>>>>>>>> easier to implement custom transactional StateStores than if we > >> were > >>>>>> to > >>>>>>>>>> remove it? Probably not. > >>>>>>>>>> > >>>>>>>>>> 4. > >>>>>>>>>> The main motivation for the Atomic Checkpointing is actually > >>>>>>>> performance. > >>>>>>>>>> My team has been testing out an implementation of this KIP > without > >>>> it, > >>>>>>>> and > >>>>>>>>>> we had problems with RocksDB doing *much* more compaction, due > to > >>>> the > >>>>>>>>>> significantly increased flush rate. It was enough of a problem > >> that > >>>>>> (for > >>>>>>>>>> the time being), we had to revert back to Kafka Streams proper. > >>>>>>>>>> I think the best way to solve this, as you say, is to keep the > >>>>>>>> .checkpoint > >>>>>>>>>> files *in addition* to the offsets being stored within the store > >>>>>> itself. > >>>>>>>>>> Essentially, when closing StateStores, we force a memtable > flush, > >>>> then > >>>>>>>>>> call getCommittedOffsets and write those out to the .checkpoint > >>>> file. > >>>>>>>>>> That would ensure the metadata is available to the > >>>>>>>>>> StreamsPartitionAssignor for all closed stores. > >>>>>>>>>> If there's a crash (no clean close), then we won't be able to > >>>>>> guarantee > >>>>>>>>>> which offsets were flushed to disk by RocksDB, so we'd need to > >> open > >>>> ( > >>>>>>>>>> init()), read offsets, and then close() those stores. But since > >> this > >>>>>> is > >>>>>>>>>> the exception, and will only occur once (provided it doesn't > crash > >>>>>> every > >>>>>>>>>> time!), I think the performance impact here would be acceptable. > >>>>>>>>>> > >>>>>>>>>> Thanks for the feedback, please let me know if you have any more > >>>>>>>> comments > >>>>>>>>>> or questions! > >>>>>>>>>> > >>>>>>>>>> I'm currently working on rebasing against trunk. This involves > >>>> adding > >>>>>>>>>> support for transactionality to VersionedStateStores. I will > >>>> probably > >>>>>>>> need > >>>>>>>>>> to revise my implementation for transactional "segmented" > stores, > >>>> both > >>>>>>>> to > >>>>>>>>>> accommodate VersionedStateStore, and to clean up some other > stuff. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Nick > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org> > >>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Nick, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the updates! > >>>>>>>>>>> > >>>>>>>>>>> I have a couple of questions/comments. > >>>>>>>>>>> > >>>>>>>>>>> 1. > >>>>>>>>>>> Why do you propose a configuration that involves max. bytes and > >>>> max. > >>>>>>>>>>> reords? I think we are mainly concerned about memory > consumption > >>>>>>>> because > >>>>>>>>>>> we want to limit the off-heap memory used. I cannot think of a > >> case > >>>>>>>>>>> where one would want to set the max. number of records. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 2. > >>>>>>>>>>> Why does > >>>>>>>>>>> > >>>>>>>>>>> default void commit(final Map<TopicPartition, Long> > >>>>>>>> changelogOffsets) { > >>>>>>>>>>> flush(); > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> take a map of partitions to changelog offsets? > >>>>>>>>>>> The mapping between state stores to partitions is a 1:1 > >>>> relationship. > >>>>>>>>>>> Passing in a single changelog offset should suffice. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 3. > >>>>>>>>>>> Why do we need the Transaction interface? It should be possible > >> to > >>>>>> hide > >>>>>>>>>>> beginning and committing a transactions withing the state store > >>>>>>>>>>> implementation, so that from outside the state store, it does > not > >>>>>>>> matter > >>>>>>>>>>> whether the state store is transactional or not. What would be > >> the > >>>>>>>>>>> advantage of using the Transaction interface? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 4. > >>>>>>>>>>> Regarding checkpointing offsets, I think we should keep the > >>>>>> checkpoint > >>>>>>>>>>> file in any case for the reason you mentioned about > rebalancing. > >>>> Even > >>>>>>>> if > >>>>>>>>>>> that would not be an issue, I would propose to move the change > to > >>>>>>>> offset > >>>>>>>>>>> management to a new KIP and to not add more complexity than > >> needed > >>>> to > >>>>>>>>>>> this one. I would not be too concerned about the consistency > >>>>>> violation > >>>>>>>>>>> you mention. As far as I understand, with transactional state > >>>> stores > >>>>>>>>>>> Streams would write the checkpoint file during every commit > even > >>>>>> under > >>>>>>>>>>> EOS. In the failure case you describe, Streams would restore > the > >>>>>> state > >>>>>>>>>>> stores from the offsets found in the checkpoint file written > >> during > >>>>>> the > >>>>>>>>>>> penultimate commit instead of during the last commit. > Basically, > >>>>>>>> Streams > >>>>>>>>>>> would overwrite the records written to the state store between > >> the > >>>>>> last > >>>>>>>>>>> two commits with the same records read from the changelogs. > >> While I > >>>>>>>>>>> understand that this is wasteful, it is -- at the same time -- > >>>>>>>>>>> acceptable and most importantly it does not break EOS. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Bruno > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 27.04.23 12:34, Nick Telford wrote: > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> I find myself (again) considering removing the offset > management > >>>>>> from > >>>>>>>>>>>> StateStores, and keeping the old checkpoint file system. The > >>>> reason > >>>>>> is > >>>>>>>>>>> that > >>>>>>>>>>>> the StreamPartitionAssignor directly reads checkpoint files in > >>>> order > >>>>>>>> to > >>>>>>>>>>>> determine which instance has the most up-to-date copy of the > >> local > >>>>>>>>>>> state. > >>>>>>>>>>>> If we move offsets into the StateStore itself, then we will > need > >>>> to > >>>>>>>>>>> open, > >>>>>>>>>>>> initialize, read offsets and then close each StateStore (that > is > >>>> not > >>>>>>>>>>>> already assigned and open) for which we have *any* local > state, > >> on > >>>>>>>> every > >>>>>>>>>>>> rebalance. > >>>>>>>>>>>> > >>>>>>>>>>>> Generally, I don't think there are many "orphan" stores like > >> this > >>>>>>>>>>> sitting > >>>>>>>>>>>> around on most instances, but even a few would introduce > >>>> additional > >>>>>>>>>>> latency > >>>>>>>>>>>> to an already somewhat lengthy rebalance procedure. > >>>>>>>>>>>> > >>>>>>>>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping > >>>> things > >>>>>>>> in > >>>>>>>>>>> the > >>>>>>>>>>>> checkpoint file(s) for now, and not worrying about the race. > The > >>>>>>>>>>> downside > >>>>>>>>>>>> is that we wouldn't be able to remove the explicit RocksDB > flush > >>>>>>>>>>> on-commit, > >>>>>>>>>>>> which likely hurts performance. > >>>>>>>>>>>> > >>>>>>>>>>>> If anyone has any thoughts or ideas on this subject, I would > >>>>>>>> appreciate > >>>>>>>>>>> it! > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> Nick > >>>>>>>>>>>> > >>>>>>>>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford < > >>>> nick.telf...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>> > >>>>>>>>>>>>> The issue is that if there's a crash between 2 and 3, then > you > >>>>>> still > >>>>>>>>>>> end > >>>>>>>>>>>>> up with inconsistent data in RocksDB. The only way to > guarantee > >>>>>> that > >>>>>>>>>>> your > >>>>>>>>>>>>> checkpoint offsets and locally stored data are consistent > with > >>>> each > >>>>>>>>>>> other > >>>>>>>>>>>>> are to atomically commit them, which can be achieved by > having > >>>> the > >>>>>>>>>>> offsets > >>>>>>>>>>>>> stored in RocksDB. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The offsets column family is likely to be extremely small > (one > >>>>>>>>>>>>> per-changelog partition + one per Topology input partition > for > >>>>>>>> regular > >>>>>>>>>>>>> stores, one per input partition for global stores). So the > >>>> overhead > >>>>>>>>>>> will be > >>>>>>>>>>>>> minimal. > >>>>>>>>>>>>> > >>>>>>>>>>>>> A major benefit of doing this is that we can remove the > >> explicit > >>>>>>>> calls > >>>>>>>>>>> to > >>>>>>>>>>>>> db.flush(), which forcibly flushes memtables to disk > on-commit. > >>>> It > >>>>>>>>>>> turns > >>>>>>>>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka > >>>> Streams > >>>>>>>>>>>>> commits, *not* RocksDB configuration, which could be a major > >>>> source > >>>>>>>> of > >>>>>>>>>>>>> confusion. Atomic checkpointing makes it safe to remove these > >>>>>>>> explicit > >>>>>>>>>>>>> flushes, because it no longer matters exactly when RocksDB > >>>> flushes > >>>>>>>>>>> data to > >>>>>>>>>>>>> disk; since the data and corresponding checkpoint offsets > will > >>>>>> always > >>>>>>>>>>> be > >>>>>>>>>>>>> flushed together, the local store is always in a consistent > >>>> state, > >>>>>>>> and > >>>>>>>>>>>>> on-restart, it can always safely resume restoration from the > >>>>>> on-disk > >>>>>>>>>>>>> offsets, restoring the small amount of data that hadn't been > >>>>>> flushed > >>>>>>>>>>> when > >>>>>>>>>>>>> the app exited/crashed. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Nick > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy < > >> c...@littlehorse.io> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for your reply. Ack to A) and B). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For item C), I see what you're referring to. Your proposed > >>>>>> solution > >>>>>>>>>>> will > >>>>>>>>>>>>>> work, so no need to change it. What I was suggesting was > that > >> it > >>>>>>>>>>> might be > >>>>>>>>>>>>>> possible to achieve this with only one column family. So > long > >>>> as: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> - No uncommitted records (i.e. not committed to the > >>>>>> changelog) > >>>>>>>> are > >>>>>>>>>>>>>> *committed* to the state store, AND > >>>>>>>>>>>>>> - The Checkpoint offset (which refers to the > changelog > >>>>>> topic) > >>>>>>>> is > >>>>>>>>>>> less > >>>>>>>>>>>>>> than or equal to the last written changelog offset > in > >>>>>> rocksdb > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I don't see the need to do the full restoration from > scratch. > >> My > >>>>>>>>>>>>>> understanding was that prior to 844/892, full restorations > >> were > >>>>>>>>>>> required > >>>>>>>>>>>>>> because there could be uncommitted records written to > RocksDB; > >>>>>>>>>>> however, > >>>>>>>>>>>>>> given your use of RocksDB transactions, that can be avoided > >> with > >>>>>> the > >>>>>>>>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB > >>>>>>>>>>> transaction, 3) > >>>>>>>>>>>>>> update offset in checkpoint file. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Anyways, your proposed solution works equivalently and I > don't > >>>>>>>> believe > >>>>>>>>>>>>>> there is much overhead to an additional column family in > >>>> RocksDB. > >>>>>>>>>>> Perhaps > >>>>>>>>>>>>>> it may even perform better than making separate writes to > the > >>>>>>>>>>> checkpoint > >>>>>>>>>>>>>> file. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford < > >>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> A. I've done my best to de-couple the StateStore stuff from > >> the > >>>>>>>> rest > >>>>>>>>>>> of > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> Streams engine. The fact that there will be only one > ongoing > >>>>>>>> (write) > >>>>>>>>>>>>>>> transaction at a time is not guaranteed by any API, and is > >>>> just a > >>>>>>>>>>>>>>> consequence of the way Streams operates. To that end, I > tried > >>>> to > >>>>>>>>>>> ensure > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> documentation and guarantees provided by the new APIs are > >>>>>>>>>>> independent of > >>>>>>>>>>>>>>> this incidental behaviour. In practice, you're right, this > >>>>>>>>>>> essentially > >>>>>>>>>>>>>>> refers to "interactive queries", which are technically > "read > >>>>>>>>>>>>>> transactions", > >>>>>>>>>>>>>>> even if they don't actually use the transaction API to > >> isolate > >>>>>>>>>>>>>> themselves. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> B. Yes, although not ideal. This is for backwards > >>>> compatibility, > >>>>>>>>>>>>>> because: > >>>>>>>>>>>>>>> 1) Existing custom StateStore implementations > will > >>>>>> implement > >>>>>>>>>>>>>> flush(), > >>>>>>>>>>>>>>> and not commit(), but the Streams engine now calls > commit(), > >> so > >>>>>>>> those > >>>>>>>>>>>>>> calls > >>>>>>>>>>>>>>> need to be forwarded to flush() for these legacy stores. > >>>>>>>>>>>>>>> 2) Existing StateStore *users*, i.e. outside of > the > >>>>>> Streams > >>>>>>>>>>> engine > >>>>>>>>>>>>>>> itself, may depend on explicitly calling flush(), so for > >> these > >>>>>>>> cases, > >>>>>>>>>>>>>>> flush() needs to be redirected to call commit(). > >>>>>>>>>>>>>>> If anyone has a better way to guarantee compatibility > without > >>>>>>>>>>>>>> introducing > >>>>>>>>>>>>>>> this potential recursion loop, I'm open to changes! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> C. This is described in the "Atomic Checkpointing" section. > >>>>>> Offsets > >>>>>>>>>>> are > >>>>>>>>>>>>>>> stored in a separate RocksDB column family, which is > >> guaranteed > >>>>>> to > >>>>>>>> be > >>>>>>>>>>>>>>> atomically flushed to disk with all other column families. > >> The > >>>>>>>> issue > >>>>>>>>>>> of > >>>>>>>>>>>>>>> checkpoints being written to disk after commit causing > >>>>>>>> inconsistency > >>>>>>>>>>> if > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>> crashes in between is the reason why, under EOS, checkpoint > >>>> files > >>>>>>>> are > >>>>>>>>>>>>>> only > >>>>>>>>>>>>>>> written on clean shutdown. This is one of the major causes > of > >>>>>> "full > >>>>>>>>>>>>>>> restorations", so moving the offsets into a place where > they > >>>> can > >>>>>> be > >>>>>>>>>>>>>>> guaranteed to be atomically written with the data they > >>>> checkpoint > >>>>>>>>>>>>>> allows us > >>>>>>>>>>>>>>> to write the checkpoint offsets *on every commit*, not just > >> on > >>>>>>>> clean > >>>>>>>>>>>>>>> shutdown. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy < > >>>> c...@littlehorse.io> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thank you for continuing this work. I have a few minor > >>>>>> clarifying > >>>>>>>>>>>>>>>> questions. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> A) "Records written to any transaction are visible to all > >>>> other > >>>>>>>>>>>>>>>> transactions immediately." I am confused here—I thought > >> there > >>>>>>>> could > >>>>>>>>>>>>>> only > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> one transaction going on at a time for a given state store > >>>> given > >>>>>>>> the > >>>>>>>>>>>>>>>> threading model for processing records on a Task. Do you > >> mean > >>>>>>>>>>>>>> Interactive > >>>>>>>>>>>>>>>> Queries by "other transactions"? (If so, then everything > >> makes > >>>>>>>>>>> sense—I > >>>>>>>>>>>>>>>> thought that since IQ were read-only then they didn't > count > >> as > >>>>>>>>>>>>>>>> transactions). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> B) Is it intentional that the default implementations of > the > >>>>>>>> flush() > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> commit() methods in the StateStore class refer to each > other > >>>> in > >>>>>>>> some > >>>>>>>>>>>>>> sort > >>>>>>>>>>>>>>>> of unbounded recursion? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> C) How will the getCommittedOffset() method work? At > first I > >>>>>>>> thought > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> way to do it would be using a special key in the RocksDB > >> store > >>>>>> to > >>>>>>>>>>>>>> store > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> offset, and committing that with the transaction. But upon > >>>>>> second > >>>>>>>>>>>>>>> thought, > >>>>>>>>>>>>>>>> since restoration from the changelog is an idempotent > >>>>>> procedure, I > >>>>>>>>>>>>>> think > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> would be fine to 1) commit the RocksDB transaction and > then > >> 2) > >>>>>>>> write > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> offset to disk in a checkpoint file. If there is a crash > >>>> between > >>>>>>>> 1) > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>> 2), > >>>>>>>>>>>>>>>> I think the only downside is now we replay a few more > >> records > >>>>>> (at > >>>>>>>> a > >>>>>>>>>>>>>> cost > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> <100ms). Am I missing something there? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Other than that, everything makes sense to me. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford < > >>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I've updated the KIP to reflect the latest version of the > >>>>>> design: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> There are several changes in there that reflect feedback > >> from > >>>>>>>> this > >>>>>>>>>>>>>>>> thread, > >>>>>>>>>>>>>>>>> and there's a new section and a bunch of interface > changes > >>>>>>>> relating > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> Atomic Checkpointing, which is the final piece of the > >> puzzle > >>>> to > >>>>>>>>>>>>>> making > >>>>>>>>>>>>>>>>> everything robust. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Let me know what you think! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford < > >>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Lucas, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for looking over my KIP. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a > >> typo > >>>> in > >>>>>>>> the > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>> that I've now corrected. It was originally per-Task, > but I > >>>>>>>>>>>>>> changed it > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> per-instance for exactly the reason you highlighted. > >>>>>>>>>>>>>>>>>> B) It's worth noting that transactionality is only > enabled > >>>>>> under > >>>>>>>>>>>>>> EOS, > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> in the default mode of operation (ALOS), there should be > >> no > >>>>>>>>>>>>>> change in > >>>>>>>>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the > >>>>>> impact > >>>>>>>> on > >>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>> sufficiently low default values for the memory bound > >>>>>>>>>>>>>> configuration. I > >>>>>>>>>>>>>>>>>> understand your hesitation to include a significant > change > >>>> of > >>>>>>>>>>>>>>>> behaviour, > >>>>>>>>>>>>>>>>>> especially in a minor release, but I suspect that most > >> users > >>>>>>>> will > >>>>>>>>>>>>>>>> prefer > >>>>>>>>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour > of > >>>>>>>>>>>>>> frequent > >>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>> restorations! If this is a problem, the changes can wait > >>>> until > >>>>>>>> the > >>>>>>>>>>>>>>> next > >>>>>>>>>>>>>>>>>> major release. I'll be running a patched version of > >> streams > >>>> in > >>>>>>>>>>>>>>>> production > >>>>>>>>>>>>>>>>>> with these changes as soon as they're ready, so it won't > >>>>>> disrupt > >>>>>>>>>>>>>> me > >>>>>>>>>>>>>>> :-D > >>>>>>>>>>>>>>>>>> C) The main purpose of this sentence was just to note > that > >>>>>> some > >>>>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>>> will need to be made to the way Segments are handled in > >>>> order > >>>>>> to > >>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>> they also benefit from transactions. At the time I wrote > >>>> it, I > >>>>>>>>>>>>>> hadn't > >>>>>>>>>>>>>>>>>> figured out the specific changes necessary, so it was > >>>>>>>> deliberately > >>>>>>>>>>>>>>>> vague. > >>>>>>>>>>>>>>>>>> This is the one outstanding problem I'm currently > working > >>>> on, > >>>>>>>> and > >>>>>>>>>>>>>>> I'll > >>>>>>>>>>>>>>>>>> update this section with more detail once I have figured > >> out > >>>>>> the > >>>>>>>>>>>>>>> exact > >>>>>>>>>>>>>>>>>> changes required. > >>>>>>>>>>>>>>>>>> D) newTransaction() provides the necessary isolation > >>>>>> guarantees. > >>>>>>>>>>>>>>> While > >>>>>>>>>>>>>>>>>> the RocksDB implementation of transactions doesn't > >>>> technically > >>>>>>>>>>>>>> *need* > >>>>>>>>>>>>>>>>>> read-only users to call newTransaction(), other > >>>>>> implementations > >>>>>>>>>>>>>>> (e.g. a > >>>>>>>>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling > >>>>>>>>>>>>>> newTransaction() > >>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>> no transaction is necessary is essentially free, as it > >> will > >>>>>> just > >>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I > think > >> it > >>>>>>>>>>>>>> should > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> fairly obvious where the performance problems stem from: > >>>>>> writes > >>>>>>>>>>>>>> under > >>>>>>>>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it > with > >>>> the > >>>>>>>>>>>>>>>>>> tombstone/record flag, 1 to decode it from the > >>>>>> tombstone/record > >>>>>>>>>>>>>> flag, > >>>>>>>>>>>>>>>>> and 1 > >>>>>>>>>>>>>>>>>> to copy the record from the "temporary" store to the > >> "main" > >>>>>>>> store, > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> transaction commits. The different approach taken by > >> KIP-869 > >>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>> perform > >>>>>>>>>>>>>>>>>> much better, as it avoids all these copies, and may > >> actually > >>>>>>>>>>>>>> perform > >>>>>>>>>>>>>>>>>> slightly better than trunk, due to batched writes in > >> RocksDB > >>>>>>>>>>>>>>> performing > >>>>>>>>>>>>>>>>>> better than non-batched writes.[1] > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy < > >>>>>>>>>>>>>> lbruts...@confluent.io > >>>>>>>>>>>>>>>>> .invalid> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I'm just starting to read up on the whole discussion > >> about > >>>>>>>>>>>>>> KIP-892 > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think > >>>>>>>>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do > >> have > >>>>>> some > >>>>>>>>>>>>>>>>>>> questions about the latest draft. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> A) If I understand correctly, you propose to > put a > >>>> bound > >>>>>> on > >>>>>>>> the > >>>>>>>>>>>>>>>>>>> (native) memory consumed by each task. However, I > wonder > >> if > >>>>>>>> this > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> sufficient if we have temporary imbalances in the > >> cluster. > >>>>>> For > >>>>>>>>>>>>>>>>>>> example, depending on the timing of rebalances during a > >>>>>> cluster > >>>>>>>>>>>>>>>>>>> restart, it could happen that a single streams node is > >>>>>>>> assigned a > >>>>>>>>>>>>>>> lot > >>>>>>>>>>>>>>>>>>> more tasks than expected. With your proposed change, > this > >>>>>> would > >>>>>>>>>>>>>> mean > >>>>>>>>>>>>>>>>>>> that the memory required by this one node could be a > >>>> multiple > >>>>>>>> of > >>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>>> is required during normal operation. I wonder if it > >>>> wouldn't > >>>>>> be > >>>>>>>>>>>>>>> safer > >>>>>>>>>>>>>>>>>>> to put a global bound on the memory use, across all > >> tasks. > >>>>>>>>>>>>>>>>>>> B) Generally, the memory concerns still give me > the > >>>>>> feeling > >>>>>>>>>>> that > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> should not be enabled by default for all users in a > minor > >>>>>>>>>>>>>> release. > >>>>>>>>>>>>>>>>>>> C) In section "Transaction Management": the > >> sentence > >>>> "A > >>>>>>>> similar > >>>>>>>>>>>>>>>>>>> analogue will be created to automatically manage > >> `Segment` > >>>>>>>>>>>>>>>>>>> transactions.". Maybe this is just me lacking some > >>>>>> background, > >>>>>>>>>>>>>> but I > >>>>>>>>>>>>>>>>>>> do not understand this, it would be great if you could > >>>>>> clarify > >>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>>> you mean here. > >>>>>>>>>>>>>>>>>>> D) Could you please clarify why IQ has to call > >>>>>>>>>>> newTransaction(), > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>> it's read-only. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> And one last thing not strictly related to your KIP: if > >>>> there > >>>>>>>> is > >>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x > >>>>>> slower > >>>>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>>>>>> by providing a flame graph), that would be quite > >>>> interesting. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> Lucas > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford < > >>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I've updated the KIP with a more detailed design, > which > >>>>>>>>>>>>>> reflects > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> implementation I've been working on: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> This new design should address the outstanding points > >>>>>> already > >>>>>>>>>>>>>> made > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> thread. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Please let me know if there are areas that are unclear > >> or > >>>>>> need > >>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>> clarification. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I have a (nearly) working implementation. I'm > confident > >>>> that > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> remaining > >>>>>>>>>>>>>>>>>>>> work (making Segments behave) will not impact the > >>>> documented > >>>>>>>>>>>>>>> design. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy < > >>>>>>>> c...@littlehorse.io > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was > hoping > >>>>>> that, > >>>>>>>>>>>>>>>> since > >>>>>>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the > >>>>>>>>>>>>>> documented > >>>>>>>>>>>>>>>> API, > >>>>>>>>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major > release > >> to > >>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> change; > >>>>>>>>>>>>>>>>>>>>> but given that it would be considered a major > change, I > >>>>>> like > >>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>> approach > >>>>>>>>>>>>>>>>>>>>> the best. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding! > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford < > >>>>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally > intended > >>>> to > >>>>>>>>>>>>>> try > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> keep the > >>>>>>>>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise > >> we'd > >>>>>>>>>>>>>> have > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> wait for > >>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> major version release to land these changes. > >>>>>>>>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of > >>>>>>>>>>>>>> guarantee, > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> typically longer commit intervals would be > problematic > >>>>>> when > >>>>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>> "committed" records. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor > >> surgery, > >>>>>>>>>>>>>> but I > >>>>>>>>>>>>>>>>>>> spent a > >>>>>>>>>>>>>>>>>>>>>> considerable amount of that time working through > ideas > >>>> for > >>>>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of > >> keeping > >>>>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>>>> as-is, but > >>>>>>>>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards, > >>>>>>>>>>>>>> although > >>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>> have a > >>>>>>>>>>>>>>>>>>>>>> solution that both expands on this, and provides for > >>>> some > >>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>> formal > >>>>>>>>>>>>>>>>>>>>>> guarantees. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for > >>>>>>>>>>>>>>>> "Transactions", > >>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read > >> Committed" > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>> under > >>>>>>>>>>>>>>>>>>> EOS, > >>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> The nice thing about this approach is that it gives > us > >>>>>> much > >>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>> clearly > >>>>>>>>>>>>>>>>>>>>>> defined isolation behaviour that can be properly > >>>>>>>>>>>>>> documented to > >>>>>>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>>>>>>> know what to expect. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and > >> will > >>>>>>>>>>>>>> update > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>> I have something. The main struggle is trying to > >>>> implement > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>>>>>>>> making any major changes to the existing interfaces > or > >>>>>>>>>>>>>>> breaking > >>>>>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>>>> implementations, because currently everything > expects > >> to > >>>>>>>>>>>>>>> operate > >>>>>>>>>>>>>>>>>>> directly > >>>>>>>>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that > store. > >> I > >>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>>>> getting > >>>>>>>>>>>>>>>>>>>>>> close, although sadly I won't be able to progress > much > >>>>>>>>>>>>>> until > >>>>>>>>>>>>>>>> next > >>>>>>>>>>>>>>>>>>> week > >>>>>>>>>>>>>>>>>>>>> due > >>>>>>>>>>>>>>>>>>>>>> to some work commitments. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy < > >>>>>>>>>>>>>>> c...@littlehorse.io> > >>>>>