Hi Bruno, Thanks for reviewing the KIP, and for voting!
> I would make the constructor public and remove the static method make(). You are right. The static factory method is not providing much benefit for the VersionedRecord class so I will remove it in order to simplify the class. Best, Victoria On Mon, Dec 19, 2022 at 9:34 AM Bruno Cadonna <cado...@apache.org> wrote: > Hi Victoria, > > I am +1 on the KIP. I just have one minor comment: > Why do we need method > > public static <V> VersionedRecord<V> make(final V value, final long > timestamp) > > in the VersionedRecord? > > The public constructor would do exactly the same, wouldn't it? > > I would make the constructor public and remove the static method make(). > > Best, > Bruno > > On 15.12.22 20:58, Victoria Xia wrote: > > Thanks again for the great discussion, Sagar, Bruno, and Matthias. I've > > just sent a message to start the vote on this KIP. Please have a look > when > > you get the chance. > > > > Thanks, > > Victoria > > > > On Wed, Dec 14, 2022 at 12:28 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Thanks for clarifying about the null-question. SGTM. > >> > >> On 12/13/22 3:06 PM, Victoria Xia wrote: > >>> Hi Matthias, > >>> > >>> Thanks for chiming in! Barring objections from anyone on this thread, I > >>> will start the vote for this KIP on Thursday. That should be enough > time > >> to > >>> incorporate any lingering minor changes. > >>> > >>>> I slightly prefer to add `VersionedRecord` interface (also > >>> like the name). I agree that it's low overhead and providing a clean > >>> path forward for future changes seems worth it to me. > >>> > >>> OK, that makes two of us. I updated the KIP just now to formally > include > >>> VersionedRecord as the new return type from the various > >>> VersionedKeyValueStore methods. > >>> > >>>> if we introduce `VersionedRecord`, I think we can keep the not-null > >>> requirement for `ValueAndTimestamp` > >>> > >>> Not quite. VersionedRecord is only used as a return type from read > >> methods, > >>> which is why VersionedRecord is able to enforce that its value is never > >>> null. If the value being returned would have been null, then we return > a > >>> null VersionedRecord instead, rather than non-null VersionedRecord with > >>> null value. So, there's no use case for a VersionedRecord with null > >> value. > >>> > >>> In contrast, even though ValueAndTimestamp is not anywhere in the > public > >>> VersionedKeyValueStore interface, ValueAndTimestamp still needs to be > >> used > >>> internally when representing a versioned key-value store as a > >>> TimestampedKeyValueStore, since TimestampedKeyValueStore is used > >> everywhere > >>> throughout the internals of the codebase. In order to represent a > >> versioned > >>> key-value store as a TimestampedKeyValueStore, we have to support > `put(K > >>> key, ValueAndTimestamp<V> value)`, which means ValueAndTimestamp needs > to > >>> support null value (with timestamp). Otherwise we cannot put a > tombstone > >>> into a versioned key-value store when using the internal > >>> TimestampedKeyValueStore representation. > >>> > >>> It's very much an implementation detail that ValueAndTimestamp needs to > >> be > >>> relaxed to allow null values. I think this is a minor enough change > that > >> is > >>> still preferable to the alternatives (refactoring the processors to not > >>> require TimestampedKeyValueStore, or introducing a separate workaround > >>> `put()` method on the TimestampedKeyValueStore representation of > >> versioned > >>> key-value stores), so I have left it in as part of the KIP. > >>> > >>> Best, > >>> Victoria > >>> > >>> On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax <mj...@apache.org> > >> wrote: > >>> > >>>> Thanks Victoria. > >>>> > >>>> I did not re-read the KIP in full on the wiki but only your email. > >>>> > >>>> Points (1)-(8) SGTM. > >>>> > >>>> About (9): I slightly prefer to add `VersionedRecord` interface (also > >>>> like the name). I agree that it's low overhead and providing a clean > >>>> path forward for future changes seems worth it to me. Btw: if we > >>>> introduce `VersionedRecord`, I think we can keep the not-null > >>>> requirement for `ValueAndTimestamp` what seems a small side benefit. > >>>> (Btw: your code snippet in the KIP shows what `VersionedRecord` would > >>>> have a non-null requirement for the value, but I think it would need > to > >>>> allow null as value?) > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 12/7/22 5:23 PM, Victoria Xia wrote: > >>>>> Thanks for the discussion, Bruno, Sagar, and Matthias! > >>>>> > >>>>> It seems we've reached consensus on almost all of the discussion > >> points. > >>>>> I've updated the KIP with the following: > >>>>> 1) renamed "timestampTo" in `get(key, timestampTo)` to > "asOfTimestamp" > >> to > >>>>> clarify that this timestamp bound is inclusive, per the SQL guideline > >>>> that > >>>>> "AS OF <timestamp>" queries are inclusive. In the future, if we want > to > >>>>> introduce a timestamp range query, we can use `get(key, > timestampFrom, > >>>>> timestampTo)` and specify that timestampTo is exclusive in this > method, > >>>>> while avoiding confusing with the inclusive asOfTimestamp parameter > in > >>>> the > >>>>> other method, given that the names are different. > >>>>> 2) added a description of "history retention" semantics into the > >>>>> VersionedKeyValueStore interface Javadoc, and updated the Javadoc for > >>>>> `get(key, asOfTimestamp)` to mention explicitly that a null result is > >>>>> returned if the provided timestamp bound is not within history > >> retention. > >>>>> 3) added a `delete(key, timestamp)` method (with return type > >>>>> `ValueAndTimestamp<V>`) to the VersionedKeyValueStore interface. > >>>>> 4) updated the Javadoc for `segmentInterval` to clarify that the only > >>>>> reason a user might be interested in this parameter is performance. > >>>>> > >>>>> Other points we discussed which did not result in updates include: > >>>>> 5) whether to automatically update the `min.compaction.lag.ms` > config > >> on > >>>>> changelog topics when history retention is changed -- there's support > >> for > >>>>> this but let's not bundle it with this KIP. We can have a separate > KIP > >> to > >>>>> change this behavior for the existing windowed changelog topics, in > >>>>> addition to versioned changelog topics. > >>>>> 6) should we expose segmentInterval in this KIP -- let's go ahead and > >>>>> expose it now since we'll almost certainly expose it (in this same > >>>> manner) > >>>>> in a follow-on KIP anyway, and so that poor performance for user > >>>> workloads > >>>>> is less likely to be a barrier for users getting started with this > >>>> feature. > >>>>> I updated the Javadoc for this parameter to clarify why the Javadoc > >>>>> mentions performance despite Javadocs typically not doing so. > >>>>> 7) `get(timestampFrom, timestampTo)` and other methods for IQ -- very > >>>>> important but deferred to a future KIP > >>>>> 8) `purge(key)`/`deleteAllVersions(key)` -- deferred to a future KIP > >>>>> > >>>>> That leaves only one unresolved discussion point: > >>>>> 9) whether to include validTo in the return types from `get(...)`. If > >> we > >>>> go > >>>>> with the current proposal of not including validTo in the return > type, > >>>> then > >>>>> it will not be easy to add it in the future (unless we want to add > >>>> validTo > >>>>> to ValueAndTimestamp, which feels odd to me). If we think we might > want > >>>> to > >>>>> have validTo in the future, we can change the return type of > `get(...)` > >>>> and > >>>>> `delete(...)` in this proposal from `ValueAndTimestamp<V>` to a new > >> type, > >>>>> e.g., `VersionedRecord<V>` or `RecordVersion<V>`, which today will > look > >>>> the > >>>>> same as `ValueAndTimestamp<V>` but in the future we can add validTo > if > >> we > >>>>> want. The cost is a new type which today looks the same as > >>>>> ValueAndTimestamp. > >>>>> > >>>>> Now that I think about it more, the cost to introducing a new type > >> seems > >>>>> relatively low. I've added a proposal towards the bottom of the KIP > >> here > >>>>> < > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores#KIP889:VersionedStateStores-Additionalreturntimestampsfromget(key,asOfTimestamp) > >>>>> . > >>>>> If others also believe that the cost of introducing this new > interface > >> is > >>>>> low (particularly relative to the flexibility it provides us for > being > >>>> able > >>>>> to evolve the class in the future), I will incorporate this proposal > >> into > >>>>> the KIP. I think the hardest part of this will be deciding on a name > >> for > >>>>> the new class :) > >>>>> > >>>>> Pending objections, I'd like to make a call on item (9) and call a > vote > >>>> on > >>>>> this KIP at the end of this week. > >>>>> > >>>>> Thanks, > >>>>> Victoria > >>>>> > >>>>> On Thu, Dec 1, 2022 at 9:47 PM Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>>> > >>>>>> Thanks Victoria! > >>>>>> > >>>>>> (1) About `ReadOnlyVersionedKeyValueStore` -- I am not sure about > IQv1 > >>>>>> vs IQv2. But you might be right that adding the interface later > might > >>>>>> not be an issue -- so it does not matter. Just wanted to double > check. > >>>>>> > >>>>>> > >>>>>> > >>>>>> (2) About `delete(key, ts)` -- as already discussed, I agree that it > >>>>>> should have same semantics as `put(key, null, ts)` (delete() needs a > >>>>>> timestamp). Not sure if `delete()` really needs to return anything? > I > >>>>>> would be ok to make it `void` -- but I think it's also semantically > >>>>>> sound if it returns the "old" value at timestamps `ts` that the > delete > >>>>>> actually deleted, as you mentioned -- in the end, an "delete" is a > >>>>>> physical append anyway (ie, "soft delete") as we want to track > >> history. > >>>>>> > >>>>>> > >>>>>> > >>>>>> (3) > >>>>>>> Ah, great question. I think the question boils down to: do we want > to > >>>>>>> require that all versioned stores (including custom user > >>>> implementations) > >>>>>>> use "history retention" to determine when to expire old record > >>>> versions? > >>>>>> > >>>>>> I personally think, yes. The main reason for this is, that I think > we > >>>>>> need to have a clear contract so we can plug-in custom > implementations > >>>>>> into the DSL later? -- I guess, having a stricter contract > initially, > >>>>>> and relaxing it later if necessary, is the easier was forward, than > >> the > >>>>>> other way around. > >>>>>> > >>>>>> For PAPI users, they are not bound to implement the interface anyway > >> and > >>>>>> can just add any store they like by extending the top level > >> `StateStore` > >>>>>> interface. > >>>>>> > >>>>>> > >>>>>> > >>>>>> (4) About `segmentInterval`: I am personally fine both ways. Seems > >> it's > >>>>>> your call to expose it or not. It seems there is a slight preference > >> to > >>>>>> expose it. > >>>>>> > >>>>>> > >>>>>> > >>>>>> (5) About `validTo`: based on my experience, it's usually simpler to > >>>>>> have it exclusive. It's also how it's defined in "system versioned > >>>>>> temporal tables" in the SQL standard, and how `AS OF <ts>` queries > >> work. > >>>>>> > >>>>>> For a join, it of course implies that if a table record has > [100,200) > >> as > >>>>>> inclusive `validFrom=100` and exclusive `validTo=200` it would only > >> join > >>>>>> with a stream-side record with 100 <= ts <= 199 (or 100 <= ts < 200 > >> :)). > >>>>>> > >>>>>> I would strongly advocate to make the upper bound exclusive (it did > >>>>>> serve us well in the past to align to SQL semantics). It must be > >> clearly > >>>>>> documented of course and we can also name variable accordingly if > >>>>>> necessary. > >>>>>> > >>>>>> > >>>>>> > >>>>>> (6) About including `validTo` in return types -- it's not easy to > >> change > >>>>>> the return type, because the signature of a method is only > determined > >> by > >>>>>> it's name in input parameter types, ie, we cannot overload an > existing > >>>>>> method to just change the return type, but would need to change its > >> name > >>>>>> or parameter list... Not sure if we can or cannot add `validTo` to > >>>>>> `ValueAndTimestamp` though, but it's a tricky question. Would be > good > >> to > >>>>>> get some more input from other if we think that it would be > important > >>>>>> enough to worry about it now or not. > >>>>>> > >>>>>> > >>>>>> > >>>>>> (7) About `get(k)` vs `get(k, ts)` vs `getAsOf(k, ts)`: I would > prefer > >>>>>> to just keep `get()` with two overloads and not add `getAsOf()`; the > >>>>>> fact that we pass in a timestamp implies we have a point in time > >> query. > >>>>>> (It's cleaner API design to leverage method overloads IMHO, and it's > >>>>>> what we did in the past). Of course, we can name the parameter > >> `get(key, > >>>>>> asOfTimestamp)` if we think it's helpful. And in alignment to have > >>>>>> `validTo` exclusive, `validTo` would be `asOfTimestampe+1` (or > >> larger), > >>>>>> in case we return it. > >>>>>> > >>>>>> > >>>>>> > >>>>>> (8) About updating topic config (ie, history retention and > compaction > >>>>>> lag): It think it was actually some oversight to not update topic > >>>>>> configs if the code changes. There is actually a Jira ticket about > >> it. I > >>>>>> would prefer to keep the behavior consistent though and not change > it > >>>>>> just for the new versioned-store, but change it globally in one shot > >>>>>> independent of this KIP. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 12/1/22 10:15 AM, Sagar wrote: > >>>>>>> Thanks Victoria, > >>>>>>> > >>>>>>> I guess an advantage of exposing a method like delete(key, > timestamp) > >>>>>> could > >>>>>>> be that from a user's standpoint, it is a single operation and not > 2. > >>>> The > >>>>>>> equivalent of this method i.e put followed by get is not atomic so > >>>>>> exposing > >>>>>>> it certainly sounds like a good idea. > >>>>>>> > >>>>>>> Thanks! > >>>>>>> Sagar. > >>>>>>> > >>>>>>> On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia > >>>>>>> <victoria....@confluent.io.invalid> wrote: > >>>>>>> > >>>>>>>> Thanks, Sagar and Bruno, for your insights and comments! > >>>>>>>> > >>>>>>>>> Sagar: Can we name according to the semantics that you want to > >>>>>>>> support like `getAsOf` or something like that? I am not sure if we > >> do > >>>>>> that > >>>>>>>> in our codebase though. Maybe the experts can chime in. > >>>>>>>> > >>>>>>>> Because it is a new method that will be added, we should be able > to > >>>>>> name it > >>>>>>>> whatever we like. I agree `getAsOf` is more clear, albeit wordier. > >>>>>>>> Introducing `getAsOf(key, timestamp)` means we could leave open > >>>>>> `get(key, > >>>>>>>> timeFrom, timeTo)` to have an exclusive `timeTo` without > >> introducing a > >>>>>>>> collision. (We could introduce `getBetween(key, timeFrom, timeTo)` > >>>>>> instead > >>>>>>>> to delineate even more clearly, though this is better left for a > >>>> future > >>>>>>>> KIP.) > >>>>>>>> > >>>>>>>> I don't think there's any existing precedent in codebase to follow > >>>> here > >>>>>> but > >>>>>>>> I'll leave that to the experts. Curious to hear what others prefer > >> as > >>>>>> well. > >>>>>>>> > >>>>>>>>> Sagar: With delete, we would stlll keep the older versions of the > >> key > >>>>>>>> right? > >>>>>>>> > >>>>>>>> We could certainly choose this for the semantics of delete(...) -- > >> and > >>>>>> it > >>>>>>>> sounds like we should too, based on Bruno's confirmation below > that > >>>> this > >>>>>>>> feels more natural to him as well -- but as Bruno noted in his > >> message > >>>>>>>> below I think we'll want the method signature to be `delete(key, > >>>>>>>> timestamp)` then, so that there is an explicit timestamp to > >> associate > >>>>>> with > >>>>>>>> the deletion. In other words, `delete(key, timestamp)` has the > same > >>>>>> effect > >>>>>>>> as `put(key, null, timestamp)`. The only difference is that the > >>>>>> `put(...)` > >>>>>>>> method has a `void` return type, while `delete(key, timestamp)` > can > >>>> have > >>>>>>>> `ValueAndTimestamp` as return type in order to return the record > >> which > >>>>>> is > >>>>>>>> replaced (if any). In other words, `delete(key, timestamp)` is > >>>>>> equivalent > >>>>>>>> to `put(key, null, timestamp)` followed by `get(key, timestamp)`. > >>>>>>>> > >>>>>>>>> Bruno: I would also not change the semantics so that it deletes > all > >>>>>>>> versions of > >>>>>>>> a key. I would rather add a new method purge(key) or > >>>>>>>> deleteAllVersions(key) or similar if we want to have such a method > >> in > >>>>>>>> this first KIP. > >>>>>>>> > >>>>>>>> Makes sense; I'm convinced. Let's defer > >>>>>>>> `purge(key)`/`deleteAllVersions(key)` to a future KIP. If there's > >>>>>> agreement > >>>>>>>> that `delete(key, timestamp)` (as described above) is valuable, we > >> can > >>>>>> keep > >>>>>>>> it in this first KIP even though it is syntactic sugar. If this > >> turns > >>>>>> into > >>>>>>>> a larger discussion, we can defer this to a future KIP as well. > >>>>>>>> > >>>>>>>>> Bruno: I would treat the history retention as a strict limit. > [...] > >>>> You > >>>>>>>> could also add historyRetentionMs() to the > VersionedKeyValueStore<K, > >>>> V> > >>>>>>>> interface to make the concept of the history retention part of the > >>>>>>>> interface. > >>>>>>>> > >>>>>>>> OK. That's the second vote for rewording the javadoc for > >>>>>>>> `VersionedKeyValueStore#get(key, timestampTo)` to remove the > >>>>>> parenthetical > >>>>>>>> and clarify that history retention should be used to dictate this > >>>> case, > >>>>>> so > >>>>>>>> I'll go ahead and do that. I'll leave out adding > >>>> `historyRetentionMs()` > >>>>>> to > >>>>>>>> the interface for now, though, for the sake of consistency with > >> other > >>>>>>>> stores (e.g., window stores) which don't expose similar types of > >>>>>>>> configurations from their interfaces. > >>>>>>>> > >>>>>>>>> Bruno: exclusive vs inclusive regarding validTo timestamp in > get(). > >>>>>>>> Doesn't this decision depend on the semantics of the join for > which > >>>> this > >>>>>>>> state store should be used? > >>>>>>>> > >>>>>>>> Yes, you are correct. As a user I would expect that a stream-side > >>>> record > >>>>>>>> with the same timestamp as a table-side record _would_ produce a > >> join > >>>>>>>> result, which is consistent with the proposal for timestampTo to > be > >>>>>>>> inclusive. (FWIW I tried this out with a Flink temporal join just > >> now > >>>>>> and > >>>>>>>> observed this result as well. Not sure where to look for other > >>>>>> standards to > >>>>>>>> validate this expectation.) > >>>>>>>> > >>>>>>>>> Bruno: If Streams does not update min.compaction.lag.ms during > >>>>>>>> rebalances, > >>>>>>>> users have to do it each time they change history retention in the > >>>> code, > >>>>>>>> right? That seems odd to me. What is the actual reason for not > >>>> updating > >>>>>>>> the config? How does Streams handle updates to windowed stores? > >>>>>>>> > >>>>>>>> Yes, users will have to update min.compaction.lag.ms for the > >>>> changelog > >>>>>>>> topic themselves if they update history retention in their code. > >> This > >>>> is > >>>>>>>> consistent with what happens for window stores today: e.g., if a > >> user > >>>>>>>> updates grace period for a windowed aggregation, then they are > >>>>>> responsible > >>>>>>>> for updating retention.ms on their windowed changelog topic as > >> well. > >>>>>>>> > >>>>>>>> I'm not familiar with the historical context around why this is > the > >>>>>> case -- > >>>>>>>> Matthias, do you know? > >>>>>>>> > >>>>>>>> My best guess is that Streams does not want to interfere with any > >>>>>> potential > >>>>>>>> out-of-band changes by the user between application restarts, > though > >>>> I'm > >>>>>>>> not sure why a user would want to change this specific config to a > >>>> value > >>>>>>>> which does not accord with the specified history retention. I > notice > >>>>>> that > >>>>>>>> there is code for validating topic configs and collecting > validation > >>>>>> errors > >>>>>>>> ( > >>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L318-L319 > >>>>>>>> ) > >>>>>>>> but this method is not called from anywhere, even though there are > >>>> unit > >>>>>>>> tests for it. I was unable to find history of this validation > after > >> a > >>>>>> quick > >>>>>>>> search. Hopefully Matthias (or others) has context, otherwise I > will > >>>>>> have a > >>>>>>>> closer look. > >>>>>>>> > >>>>>>>> - Victoria > >>>>>>>> > >>>>>>>> On Wed, Nov 23, 2022 at 8:52 AM Bruno Cadonna <cado...@apache.org > > > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> Thanks for the KIP, Victoria! > >>>>>>>>> > >>>>>>>>> I have a couple of comments. > >>>>>>>>> > >>>>>>>>> 1. delete(key) > >>>>>>>>> I think delete(key) should not remove all versions of a key. We > >>>> should > >>>>>>>>> use it to close the validity interval of the last version. > >>>>>>>>> Assuming we have records of different versions for key A: > >>>>>>>>> (A, e, 0, 2), > >>>>>>>>> (A, f, 2, 3), > >>>>>>>>> (A, g, 3, MAX) > >>>>>>>>> > >>>>>>>>> delete(A) would update them to > >>>>>>>>> > >>>>>>>>> (A, e, 0, 2), > >>>>>>>>> (A, f, 2, 3), > >>>>>>>>> (A, g, 3, 5) > >>>>>>>>> (A, null, 5, MAX) > >>>>>>>>> > >>>>>>>>> But then the question arises where does timestamp 5 that closes > the > >>>>>>>>> interval in (A, g, 3, 5) and opens the interval in (A, null, 5, > >> MAX) > >>>>>>>>> come from. We could use the timestamp at which delete(A) is > called, > >>>> but > >>>>>>>>> actually I do not like that because it seems to me it opens the > >> doors > >>>>>> to > >>>>>>>>> non-determinism. If we use event time for put() we should also > use > >> it > >>>>>>>>> for delete(). Actually, put(A, null, 5) would have the same > effect > >> as > >>>>>>>>> delete(A) in the example above. As a syntactical sugar, we could > >> add > >>>>>>>>> delete(key, validFrom). (I just realized now that I just repeated > >>>> what > >>>>>>>>> Victoria said in her previous e-mail.) > >>>>>>>>> I agree with Victoria that delete(A) as defined for other state > >>>> stores > >>>>>>>>> is hard to re-use in the versioned key-value store. > >>>>>>>>> I would also not change the semantics so that it deletes all > >> versions > >>>>>> of > >>>>>>>>> a key. I would rather add a new method purge(key) or > >>>>>>>>> deleteAllVersions(key) or similar if we want to have such a > method > >> in > >>>>>>>>> this first KIP. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 2. history retention > >>>>>>>>> I would remove "(up to store implementation discretion when this > is > >>>> the > >>>>>>>>> case)". I would treat the history retention as a strict limit. If > >>>> users > >>>>>>>>> want to implement a less strict behavior, they can still do it. > >> Maybe > >>>>>>>>> mention in the javadocs the implications of not adhering strictly > >> to > >>>>>> the > >>>>>>>>> history retention. That is, the DSL might become > non-deterministic. > >>>> You > >>>>>>>>> could also add historyRetentionMs() to the > >> VersionedKeyValueStore<K, > >>>> V> > >>>>>>>>> interface to make the concept of the history retention part of > the > >>>>>>>>> interface. > >>>>>>>>> > >>>>>>>>> 3. null vs. exception for out-of-bound queries > >>>>>>>>> I am in favor of null. The record version is not there anymore > >>>> because > >>>>>>>>> it expired. This seems to me normal and nothing exceptional. That > >>>> would > >>>>>>>>> also consistent with the behavior of other APIs as already > >> mentioned. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 4. Exposing segmentInterval > >>>>>>>>> Since we have evidence that the segment interval affects > >>>> performance, I > >>>>>>>>> would expose it. But I find it also OK to expose it once we have > a > >>>>>>>>> corresponding metric. > >>>>>>>>> > >>>>>>>>> 5. exclusive vs inclusive regarding validTo timestamp in get() > >>>>>>>>> Doesn't this decision depend on the semantics of the join for > which > >>>>>> this > >>>>>>>>> state store should be used? Should a record on the table side > that > >>>> has > >>>>>>>>> the same timestamp as the record on the stream side join? Or > should > >>>>>> only > >>>>>>>>> records in the table that are strictly before the record on the > >>>> stream > >>>>>>>>> side join? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 6. Not setting min.compaction.lag.ms during rebalances > >>>>>>>>> If Streams does not update min.compaction.lag.ms during > >> rebalances, > >>>>>>>>> users have to do it each time they change history retention in > the > >>>>>> code, > >>>>>>>>> right? That seems odd to me. What is the actual reason for not > >>>> updating > >>>>>>>>> the config? How does Streams handle updates to windowed stores? > >> That > >>>>>>>>> should be a similar situation for the retention time config of > the > >>>>>>>>> changelog topic. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Bruno > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 23.11.22 09:11, Sagar wrote: > >>>>>>>>>> Hi Vicky, > >>>>>>>>>> > >>>>>>>>>> Thanks for your response! > >>>>>>>>>> > >>>>>>>>>> I would just use numbers to refer to your comments. > >>>>>>>>>> > >>>>>>>>>> 1) Thanks for your response. Even I am not totally sure whether > >>>> these > >>>>>>>>>> should be supported via IQv2 or via store interface. That said, > I > >>>>>>>>> wouldn't > >>>>>>>>>> definitely qualify this as blocking the KIP for sure so we can > >> live > >>>>>>>>>> without it :) > >>>>>>>>>> > >>>>>>>>>> 2) Yeah if the 2 APIs for get have different semantics for > >>>>>> timestampTo, > >>>>>>>>>> then it could be confusing. I went through the link for temporal > >>>>>> tables > >>>>>>>>>> (TFS!) and I now get why the AS OF semantics would have it > >>>> inclusive. > >>>>>> I > >>>>>>>>>> think part of the problem is that the name get on it's own is > not > >> as > >>>>>>>>>> expressive as SQL. Can we name according to the semantics that > you > >>>>>> want > >>>>>>>>> to > >>>>>>>>>> support like `getAsOf` or something like that? I am not sure if > we > >>>> do > >>>>>>>>> that > >>>>>>>>>> in our codebase though. Maybe the experts can chime in. > >>>>>>>>>> > >>>>>>>>>> 3) hmm I would have named it `validUpto` But again not very > picky > >>>>>> about > >>>>>>>>> it. > >>>>>>>>>> After going through the link and your KIP, it's a lot clearer to > >> me. > >>>>>>>>>> > >>>>>>>>>> 4) I think delete(key) should be sufficient. With delete, we > would > >>>>>>>>>> stlll keep the older versions of the key right? > >>>>>>>>>> > >>>>>>>>>> Thanks! > >>>>>>>>>> Sagar. > >>>>>>>>>> > >>>>>>>>>> On Wed, Nov 23, 2022 at 12:17 AM Victoria Xia > >>>>>>>>>> <victoria....@confluent.io.invalid> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks, Matthias and Sagar, for your comments! I've responded > >> here > >>>>>> for > >>>>>>>>> now, > >>>>>>>>>>> and will update the KIP afterwards with the outcome of our > >>>>>> discussions > >>>>>>>>> as > >>>>>>>>>>> they resolve. > >>>>>>>>>>> > >>>>>>>>>>> ----------- Matthias's comments ----------- > >>>>>>>>>>> > >>>>>>>>>>>> (1) Why does the new store not extend KeyValueStore, but > >>>> StateStore? > >>>>>>>>>>> In the end, it's a KeyValueStore? > >>>>>>>>>>> > >>>>>>>>>>> A `VersionedKeyValueStore<K, V>` is not a `KeyValueStore<K, V>` > >>>>>>>> because > >>>>>>>>>>> many of the KeyValueStore methods would not make sense for a > >>>>>> versioned > >>>>>>>>>>> store. For example, `put(K key, V value)` is not meaningful > for a > >>>>>>>>> versioned > >>>>>>>>>>> store because the record needs a timestamp associated with it. > >>>>>>>>>>> > >>>>>>>>>>> A `VersionedKeyValueStore<K, V>` is more similar to a > >>>>>>>> `KeyValueStore<K, > >>>>>>>>>>> ValueAndTimestamp<V>>` (i.e., `TimestampedKeyValueStore<K, > V>`), > >>>> but > >>>>>>>>> some > >>>>>>>>>>> of the TimestampedKeyValueStore methods are still problematic. > >> For > >>>>>>>>> example, > >>>>>>>>>>> what does it mean for `delete(K key)` to have return type > >>>>>>>>>>> `ValueAndTimestamp<V>`? Does this mean that `delete(K key)` > only > >>>>>>>> deletes > >>>>>>>>>>> (and returns) the latest record version for the key? Probably > we > >>>> want > >>>>>>>> a > >>>>>>>>>>> versioned store to have `delete(K key)` delete all record > >> versions > >>>>>> for > >>>>>>>>> the > >>>>>>>>>>> given key, in which case the return type is better suited as an > >>>>>>>>>>> iterator/collection of KeyValueTimestamp. `putIfAbsent(K key, > >>>>>>>>>>> ValueAndTimestamp value)` also has ambiguous semantics for > >>>> versioned > >>>>>>>>> stores > >>>>>>>>>>> (i.e., what does it mean for the key/record to be "absent"). > >>>>>>>>>>> > >>>>>>>>>>> I agree that conceptually a versioned key-value store is just a > >>>>>>>>> key-value > >>>>>>>>>>> store, though. In the future if we redesign the store > interfaces, > >>>>>> it'd > >>>>>>>>> be > >>>>>>>>>>> great to unify them by having a more generic KeyValueStore > >>>> interface > >>>>>>>>> that > >>>>>>>>>>> allows for extra flexibility to support different types of > >>>> key-value > >>>>>>>>>>> stores, including versioned stores. (Or, if you can think of a > >> way > >>>> to > >>>>>>>>>>> achieve this with the existing interfaces today, I'm all ears!) > >>>>>>>>>>> > >>>>>>>>>>>> (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if > we > >>>>>> don't > >>>>>>>>>>> want to support IQ in this KIP, it might be good to add this > >>>>>> interface > >>>>>>>>>>> right away to avoid complications for follow up KIPs? Or won't > >>>> there > >>>>>>>> by > >>>>>>>>>>> any complications anyway? > >>>>>>>>>>> > >>>>>>>>>>> I don't think there will be complications for refactoring to > add > >>>> this > >>>>>>>>>>> interface in the future. Refactoring out > >>>>>>>> ReadOnlyVersionedKeyValueStore > >>>>>>>>>>> from VersionedKeyValueStore would leave VersionedKeyValueStore > >>>>>>>> unchanged > >>>>>>>>>>> from the outside. > >>>>>>>>>>> > >>>>>>>>>>> Also, is it true that the ReadOnlyKeyValueStore interface is > only > >>>>>> used > >>>>>>>>> for > >>>>>>>>>>> IQv1 and not IQv2? I think it's an open question as to whether > we > >>>>>>>> should > >>>>>>>>>>> support IQv1 for versioned stores or only IQv2. If the latter, > >> then > >>>>>>>>> maybe > >>>>>>>>>>> we won't need the extra interface at all. > >>>>>>>>>>> > >>>>>>>>>>>> (3) Why do we not have a `delete(key)` method? I am ok with > not > >>>>>>>>>>> supporting all methods from existing KV-store, but a > >> `delete(key)` > >>>>>>>> seems > >>>>>>>>>>> to be fundamentally to have? > >>>>>>>>>>> > >>>>>>>>>>> What do you think the semantics of `delete(key)` should be for > >>>>>>>> versioned > >>>>>>>>>>> stores? Should `delete(key)` delete (and return) all record > >>>> versions > >>>>>>>> for > >>>>>>>>>>> the key? Or should we have `delete(key, timestamp)` which is > >>>>>>>> equivalent > >>>>>>>>> to > >>>>>>>>>>> `put(key, null, timestamp)` except with a return type to return > >>>>>>>>>>> ValueAndTimestamp representing the record it replaced? > >>>>>>>>>>> > >>>>>>>>>>> If we have ready alignment on what the interface and semantics > >> for > >>>>>>>>>>> `delete(key)` should be, then adding it in this KIP sounds > good. > >> I > >>>>>>>> just > >>>>>>>>>>> didn't want the rest of the KIP to be hung up over additional > >>>>>>>>> interfaces, > >>>>>>>>>>> given that we can always add extra interfaces in the future. > >>>>>>>>>>> > >>>>>>>>>>>> (4a) Do we need `get(key)`? It seems to be the same as > `get(key, > >>>>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? > >> Just > >>>>>> for > >>>>>>>>>>> my own clarification (should we add something to the > JavaDocs?). > >>>>>>>>>>> > >>>>>>>>>>> Correct, it is just syntactic sugar. I will add a clarification > >>>> into > >>>>>>>> the > >>>>>>>>>>> Javadocs as you've suggested. > >>>>>>>>>>> > >>>>>>>>>>>> (4b) Should we throw an exception if a user queries > out-of-bound > >>>>>>>>>>> instead of returning `null` (in `get(key,ts)`)? > >>>>>>>>>>> -> You put it into "rejected alternatives", and I > >> understand > >>>>>> your > >>>>>>>>>>> argument. Would love to get input from others about this > question > >>>>>>>>>>> though. -- It seems we also return `null` for windowed stores, > so > >>>>>>>> maybe > >>>>>>>>>>> the strongest argument is to align to existing behavior? Or do > we > >>>>>> have > >>>>>>>>>>> case for which the current behavior is problematic? > >>>>>>>>>>> > >>>>>>>>>>> Sure; curious to hear what others think as well. > >>>>>>>>>>> > >>>>>>>>>>>> (4c) JavaDoc on `get(key,ts)` says: "(up to store > implementation > >>>>>>>>>>> discretion when this is the case)" -> Should we make it a > >> stricter > >>>>>>>>>>> contract such that the user can reason about it better (there > is > >>>> WIP > >>>>>>>> to > >>>>>>>>>>> make retention time a strict bound for windowed stores atm) > >>>>>>>>>>> -> JavaDocs on `persistentVersionedKeyValueStore` > seems to > >>>>>>>> suggest a > >>>>>>>>>>> strict bound, too. > >>>>>>>>>>> > >>>>>>>>>>> Ah, great question. I think the question boils down to: do we > >> want > >>>> to > >>>>>>>>>>> require that all versioned stores (including custom user > >>>>>>>>> implementations) > >>>>>>>>>>> use "history retention" to determine when to expire old record > >>>>>>>> versions? > >>>>>>>>>>> > >>>>>>>>>>> Because the `persistentVersionedKeyValueStore(...)` method > >> returns > >>>>>>>>>>> instances of the provided RocksDB-based versioned store > >>>>>>>> implementation, > >>>>>>>>>>> which does use history retention for this purpose, that's why > we > >>>> can > >>>>>>>>> very > >>>>>>>>>>> clearly say that for this store, `get(key, ts)` will return > null > >> if > >>>>>>>> the > >>>>>>>>>>> provided timestamp bound has fallen out of history retention. > The > >>>>>>>>> reason I > >>>>>>>>>>> left the `VersionedKeyValueStore#get(key, ts)` Javadoc more > >> generic > >>>>>>>>> (i.e., > >>>>>>>>>>> does not mention history retention) is because maybe a user > >>>>>>>> implementing > >>>>>>>>>>> their own custom store will choose a different expiry > mechanism, > >>>>>> e.g., > >>>>>>>>> keep > >>>>>>>>>>> the three latest versions for each key regardless of how old > the > >>>>>>>>> timestamps > >>>>>>>>>>> are. > >>>>>>>>>>> > >>>>>>>>>>> If we want to require that all versioned stores use history > >>>> retention > >>>>>>>> in > >>>>>>>>>>> order to determine when to expire old records, then I will > >>>> certainly > >>>>>>>>> update > >>>>>>>>>>> the Javadoc to clarify. This is already a requirement for DSL > >> users > >>>>>>>>> because > >>>>>>>>>>> the VersionedBytesStoreSupplier interface requires history > >>>> retention > >>>>>>>> to > >>>>>>>>> be > >>>>>>>>>>> provided (in order for changelog topic configs to be properly > >> set), > >>>>>> so > >>>>>>>>> it's > >>>>>>>>>>> just a question of whether we also want to require PAPI users > to > >>>> use > >>>>>>>>>>> history retention too. I had a look at the existing window > stores > >>>> and > >>>>>>>>>>> didn't see precedent for requiring all window stores have a > >>>> standard > >>>>>>>>>>> "retention time" concept for how long to keep windows, but if > we > >>>> want > >>>>>>>> to > >>>>>>>>>>> have a standard "history retention" concept for versioned > stores > >> we > >>>>>>>>>>> certainly can. WDYT? > >>>>>>>>>>> > >>>>>>>>>>>> (5a) Do we need to expose `segmentInterval`? For > >> windowed-stores, > >>>> we > >>>>>>>>>>> also use segments but hard-code it to two (it was exposed in > >>>> earlier > >>>>>>>>>>> versions but it seems not useful, even if we would be open to > >>>> expose > >>>>>>>> it > >>>>>>>>>>> again if there is user demand). > >>>>>>>>>>> > >>>>>>>>>>> If we want to leave it out of this first KIP (and potentially > >>>> expose > >>>>>>>> it > >>>>>>>>> in > >>>>>>>>>>> the future), that works for me. The performance benchmarks I > ran > >>>>>>>> suggest > >>>>>>>>>>> that this parameter greatly impacts store performance though > and > >> is > >>>>>>>> very > >>>>>>>>>>> workload dependent. If a user reported poor performance using > >>>>>>>> versioned > >>>>>>>>>>> stores for their workload, this is the first parameter I would > >> want > >>>>>> to > >>>>>>>>>>> tune. That said, metrics/observability for versioned stores > >> (which > >>>>>>>>> would be > >>>>>>>>>>> helpful for determining how this parameter should be adjusted) > >> have > >>>>>>>> been > >>>>>>>>>>> deferred to a follow-up KIP, so perhaps that's reason to defer > >>>>>>>> exposing > >>>>>>>>>>> this parameter as well. > >>>>>>>>>>> > >>>>>>>>>>>> (5b) JavaDocs says: "Performance degrades as more record > >> versions > >>>>>> for > >>>>>>>>>>> the same key are collected in a single segment. On the other > >> hand, > >>>>>>>>>>> out-of-order writes and reads which access older segments may > >> slow > >>>>>>>> down > >>>>>>>>>>> if there are too many segments." -- Wondering if JavaDocs > should > >>>> make > >>>>>>>>>>> any statements about expected performance? Seems to be an > >>>>>>>> implementation > >>>>>>>>>>> detail? > >>>>>>>>>>> > >>>>>>>>>>> I included this sentence to explain why a user might want to > tune > >>>>>> this > >>>>>>>>>>> value / help guide how to think about the parameter, but if we > >> want > >>>>>> to > >>>>>>>>>>> remove it entirely (per the discussion point above) then this > >>>> Javadoc > >>>>>>>>> will > >>>>>>>>>>> be removed with it. > >>>>>>>>>>> > >>>>>>>>>>>> (6) validTo timestamp is "exclusive", right? Ie, if I query > >>>>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next" > record > >>>> v2 > >>>>>>>>>>> with validFromV2=ts? > >>>>>>>>>>> > >>>>>>>>>>> I actually intended for it to be inclusive (will update the > KIP). > >>>> Do > >>>>>>>> you > >>>>>>>>>>> think exclusive is more intuitive? The reason I had inclusive > in > >> my > >>>>>>>>> mind is > >>>>>>>>>>> because it's like a "AS OF <time>" query, which treats the time > >>>> bound > >>>>>>>> as > >>>>>>>>>>> inclusive. > >>>>>>>>>>> > >>>>>>>>>>>> (7) The KIP says, that segments are stores in the same RocksDB > >> -- > >>>>>> for > >>>>>>>>>>> this case, how are efficient deletes handled? For > windowed-store, > >>>> we > >>>>>>>> can > >>>>>>>>>>> just delete a full RocksDB. > >>>>>>>>>>> > >>>>>>>>>>> The way that multiple segments are represented in the same > >> RocksDB > >>>> is > >>>>>>>>> that > >>>>>>>>>>> the RocksDB keys are prefixed with segment ID. An entire > segment > >> is > >>>>>>>>> deleted > >>>>>>>>>>> with a single `deleteRange()` call to RocksDB. > >>>>>>>>>>> > >>>>>>>>>>>> (8) Rejected alternatives: you propose to not return the > validTo > >>>>>>>>>>> timestamp -- if we find it useful in the future to return it, > >> would > >>>>>>>>>>> there be a clean path to change it accordingly? > >>>>>>>>>>> > >>>>>>>>>>> With the current proposal, there's no clean path. If we think > >>>> there's > >>>>>>>> a > >>>>>>>>>>> good chance we might want to do this in the future, then we > >> should > >>>>>>>>> update > >>>>>>>>>>> the proposed interfaces. > >>>>>>>>>>> > >>>>>>>>>>> The current proposed return type from > `VersionedKeyValueStore<K, > >>>>>>>>>>> V>#get(key, tsTo)` is `ValueAndTimestamp<V>`. There's no way to > >>>> add a > >>>>>>>>>>> second timestamp into `ValueAndTimestamp<V>`, which is why > >> there's > >>>> no > >>>>>>>>> clean > >>>>>>>>>>> path to include validTo timestamp in the future under the > >> existing > >>>>>>>>>>> proposal. > >>>>>>>>>>> > >>>>>>>>>>> If we wanted to allow for including validTo timestamp in the > >>>> future, > >>>>>>>>> we'd > >>>>>>>>>>> instead update the return type to be a new `VersionedRecord<V>` > >>>>>>>> object. > >>>>>>>>>>> Today a `VersionedRecord<V>` could just include `value` and > >>>>>>>> `timestamp`, > >>>>>>>>>>> and in the future we could add `validTo` (names subject to > >> change) > >>>>>>>> into > >>>>>>>>> the > >>>>>>>>>>> `VersionedRecord` as well. (It'd look a little strange for now > >>>> since > >>>>>>>>>>> VersionedRecord is the same as ValueAndTimestamp, but that > seems > >>>>>>>> fine.) > >>>>>>>>>>> > >>>>>>>>>>> If we choose to do this, I think we should also update the > return > >>>>>> type > >>>>>>>>> of > >>>>>>>>>>> `VersionedKeyValueStore#get(key)` to be VersionedRecord as > well, > >>>>>>>> rather > >>>>>>>>>>> than having one return TimestampAndValue while the other > returns > >>>>>>>>>>> VersionedRecord. > >>>>>>>>>>> > >>>>>>>>>>> ----------- Sagar's comments ----------- > >>>>>>>>>>> > >>>>>>>>>>>> 1) Did you consider adding a method similar to : > >>>>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)? > >>>>>>>>>>> I think this could be useful considering that this > >>>>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT? > >>>>>>>>>>> > >>>>>>>>>>> Yes, I do think this method is valuable. I think we will > >> definitely > >>>>>>>>> want to > >>>>>>>>>>> support time-range based queries at some point (hopefully > soon), > >>>> and > >>>>>>>>> likely > >>>>>>>>>>> also key-range based queries (to achieve feature parity with > >>>> existing > >>>>>>>>>>> key-value stores). > >>>>>>>>>>> > >>>>>>>>>>> It's not immediately clear to me whether these types of queries > >>>>>> should > >>>>>>>>> be > >>>>>>>>>>> supported as part of the store interface or if they should only > >> be > >>>>>>>>>>> supported via the `query(...)` method for IQv2. (It's an open > >>>>>> question > >>>>>>>>> as > >>>>>>>>>>> to whether we should support IQv1 for versioned stores or only > >>>> IQv2. > >>>>>> A > >>>>>>>>>>> benefit of IQv2 over IQv1 is that we won't need to add > individual > >>>>>>>> store > >>>>>>>>>>> methods for each type of query, including for all wrapped store > >>>>>>>> layers.) > >>>>>>>>>>> > >>>>>>>>>>> If we have clear non-IQ use cases for these methods (e.g., use > >>>> cases > >>>>>>>>> within > >>>>>>>>>>> processors), then they'll need to be added as part of the store > >>>>>>>>> interface > >>>>>>>>>>> for sure. I'm leaning towards adding them as part of the store > >>>>>>>> interface > >>>>>>>>>>> but given the ambiguity here, it may be preferrable to defer > to a > >>>>>>>>> follow-up > >>>>>>>>>>> KIP. OTOH, if you think the versioned store interface as > proposed > >>>> in > >>>>>>>>> this > >>>>>>>>>>> KIP is too bare bones to be useful, I'm open to adding it in > now > >> as > >>>>>>>>> well. > >>>>>>>>>>> > >>>>>>>>>>>> 2) I have a similar question as Matthias, about the > timestampTo > >>>>>>>>> argument > >>>>>>>>>>> when doing a get. Is it inclusive or exclusive? > >>>>>>>>>>> > >>>>>>>>>>> Same answer (and follow-up question) as above. Do you think it > >> will > >>>>>> be > >>>>>>>>>>> confusing for `get(key, tsTo)` to use an inclusive time bound, > >>>> while > >>>>>>>>>>> `get(key, tsFrom, tsTo)` would use an exclusive tsTo time > bound? > >>>>>> Maybe > >>>>>>>>> we > >>>>>>>>>>> should rename `get(key, tsFrom, tsTo)` to `getVersions(...)` or > >>>>>>>>>>> `getRange(...)` in order to avoid confusion. > >>>>>>>>>>> > >>>>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is > essentially > >>>> the > >>>>>>>>>>> timestamp at which the record was inserted. validFrom makes it > >>>> sound > >>>>>>>>> like > >>>>>>>>>>> validTo which can keep changing based on new records while > *from* > >>>> is > >>>>>>>>> fixed. > >>>>>>>>>>> WDYT? > >>>>>>>>>>> > >>>>>>>>>>> "It is essentially the timestamp at which the record was > >> inserted" > >>>>>> <-- > >>>>>>>>> Yes, > >>>>>>>>>>> that's correct. > >>>>>>>>>>> > >>>>>>>>>>> I borrowed the "validFrom/validTo" terminology from temporal > >>>> tables, > >>>>>>>>> e.g., > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver16 > >>>>>>>>>>> . > >>>>>>>>>>> I don't believe the terms "validFrom" or "validTo" are > currently > >>>>>>>> exposed > >>>>>>>>>>> anywhere in any of the user-facing interfaces (or Javadocs); I > >> just > >>>>>>>>> needed > >>>>>>>>>>> a way to refer to the concepts in the KIP. Hopefully this is a > >>>>>>>> non-issue > >>>>>>>>>>> (at least for now) as a result. Do you have a suggestion for > >>>>>>>> terminology > >>>>>>>>>>> that would've been less confusing? > >>>>>>>>>>> > >>>>>>>>>>>> 4) Even I think delete api should be supported. > >>>>>>>>>>> > >>>>>>>>>>> Makes sense. It'd be to get your input on the same follow-up > >>>>>>>> questions I > >>>>>>>>>>> asked Matthias above as well :) > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Nov 22, 2022 at 4:25 AM Sagar < > sagarmeansoc...@gmail.com > >>> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Victoria, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the KIP. Seems like a very interesting idea! > >>>>>>>>>>>> > >>>>>>>>>>>> I have a couple of questions: > >>>>>>>>>>>> > >>>>>>>>>>>> 1) Did you consider adding a method similar to : > >>>>>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)? > >>>>>>>>>>>> > >>>>>>>>>>>> I think this could be useful considering that this > >>>>>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT? > >>>>>>>>>>>> > >>>>>>>>>>>> 2) I have a similar question as Matthias, about the > timestampTo > >>>>>>>>> argument > >>>>>>>>>>>> when doing a get. Is it inclusive or exclusive? > >>>>>>>>>>>> > >>>>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is > essentially > >>>> the > >>>>>>>>>>>> timestamp at which the record was inserted. validFrom makes it > >>>> sound > >>>>>>>>> like > >>>>>>>>>>>> validTo which can keep changing based on new records while > >> *from* > >>>> is > >>>>>>>>>>> fixed. > >>>>>>>>>>>> WDYT? > >>>>>>>>>>>> > >>>>>>>>>>>> 4) Even I think delete api should be supported. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks! > >>>>>>>>>>>> Sagar. > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax < > >> mj...@apache.org > >>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP Victoria. Very well written! > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Couple of questions (many might just require to add some more > >>>>>>>> details > >>>>>>>>>>> to > >>>>>>>>>>>>> the KIP): > >>>>>>>>>>>>> > >>>>>>>>>>>>> (1) Why does the new store not extend KeyValueStore, > but > >>>>>>>>> StateStore? > >>>>>>>>>>>>> In the end, it's a KeyValueStore? > >>>>>>>>>>>>> > >>>>>>>>>>>>> (2) Should we have a ReadOnlyVersionedKeyValueStore? > Even > >>>> if we > >>>>>>>>> don't > >>>>>>>>>>>>> want to support IQ in this KIP, it might be good to add this > >>>>>>>> interface > >>>>>>>>>>>>> right away to avoid complications for follow up KIPs? Or > won't > >>>>>> there > >>>>>>>>> by > >>>>>>>>>>>>> any complications anyway? > >>>>>>>>>>>>> > >>>>>>>>>>>>> (3) Why do we not have a `delete(key)` method? I am ok > >> with > >>>> not > >>>>>>>>>>>>> supporting all methods from existing KV-store, but a > >>>> `delete(key)` > >>>>>>>>>>> seems > >>>>>>>>>>>>> to be fundamentally to have? > >>>>>>>>>>>>> > >>>>>>>>>>>>> (4a) Do we need `get(key)`? It seems to be the same as > >>>>>> `get(key, > >>>>>>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? > >>>> Just > >>>>>>>> for > >>>>>>>>>>>>> my own clarification (should we add something to the > >> JavaDocs?). > >>>>>>>>>>>>> > >>>>>>>>>>>>> (4b) Should we throw an exception if a user queries > >>>>>> out-of-bound > >>>>>>>>>>>>> instead of returning `null` (in `get(key,ts)`)? > >>>>>>>>>>>>> -> You put it into "rejected alternatives", and I > >>>> understand > >>>>>>>> your > >>>>>>>>>>>>> argument. Would love to get input from others about this > >> question > >>>>>>>>>>>>> though. -- It seems we also return `null` for windowed > stores, > >> so > >>>>>>>>> maybe > >>>>>>>>>>>>> the strongest argument is to align to existing behavior? Or > do > >> we > >>>>>>>> have > >>>>>>>>>>>>> case for which the current behavior is problematic? > >>>>>>>>>>>>> > >>>>>>>>>>>>> (4c) JavaDoc on `get(key,ts)` says: "(up to store > >>>>>> implementation > >>>>>>>>>>>>> discretion when this is the case)" -> Should we make it a > >>>> stricter > >>>>>>>>>>>>> contract such that the user can reason about it better (there > >> is > >>>>>> WIP > >>>>>>>>> to > >>>>>>>>>>>>> make retention time a strict bound for windowed stores atm) > >>>>>>>>>>>>> -> JavaDocs on `persistentVersionedKeyValueStore` > seems > >> to > >>>>>>>>> suggest a > >>>>>>>>>>>>> strict bound, too. > >>>>>>>>>>>>> > >>>>>>>>>>>>> (5a) Do we need to expose `segmentInterval`? For > >>>>>> windowed-stores, > >>>>>>>>> we > >>>>>>>>>>>>> also use segments but hard-code it to two (it was exposed in > >>>>>> earlier > >>>>>>>>>>>>> versions but it seems not useful, even if we would be open to > >>>>>> expose > >>>>>>>>> it > >>>>>>>>>>>>> again if there is user demand). > >>>>>>>>>>>>> > >>>>>>>>>>>>> (5b) JavaDocs says: "Performance degrades as more > record > >>>>>> versions > >>>>>>>>> for > >>>>>>>>>>>>> the same key are collected in a single segment. On the other > >>>> hand, > >>>>>>>>>>>>> out-of-order writes and reads which access older segments may > >>>> slow > >>>>>>>>> down > >>>>>>>>>>>>> if there are too many segments." -- Wondering if JavaDocs > >> should > >>>>>>>> make > >>>>>>>>>>>>> any statements about expected performance? Seems to be an > >>>>>>>>>>> implementation > >>>>>>>>>>>>> detail? > >>>>>>>>>>>>> > >>>>>>>>>>>>> (6) validTo timestamp is "exclusive", right? Ie, if I > >> query > >>>>>>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next" > >> record > >>>>>> v2 > >>>>>>>>>>>>> with validFromV2=ts? > >>>>>>>>>>>>> > >>>>>>>>>>>>> (7) The KIP says, that segments are stores in the same > >>>> RocksDB > >>>>>> -- > >>>>>>>>> for > >>>>>>>>>>>>> this case, how are efficient deletes handled? For > >> windowed-store, > >>>>>> we > >>>>>>>>>>> can > >>>>>>>>>>>>> just delete a full RocksDB. > >>>>>>>>>>>>> > >>>>>>>>>>>>> (8) Rejected alternatives: you propose to not return > the > >>>>>> validTo > >>>>>>>>>>>>> timestamp -- if we find it useful in the future to return it, > >>>> would > >>>>>>>>>>>>> there be a clean path to change it accordingly? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 11/16/22 9:57 PM, Victoria Xia wrote: > >>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I have a proposal for introducing versioned state stores in > >>>> Kafka > >>>>>>>>>>>>> Streams. > >>>>>>>>>>>>>> Versioned state stores are similar to key-value stores > except > >>>> they > >>>>>>>>>>> can > >>>>>>>>>>>>>> store multiple record versions for a single key. This KIP > >>>> focuses > >>>>>>>> on > >>>>>>>>>>>>>> interfaces only in order to limit the scope of the KIP. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Victoria > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >