Hi Bruno,

Thanks for reviewing the KIP, and for voting!

> I would make the constructor public and remove the static method make().

You are right. The static factory method is not providing much benefit for
the VersionedRecord class so I will remove it in order to simplify the
class.

Best,
Victoria

On Mon, Dec 19, 2022 at 9:34 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Victoria,
>
> I am +1 on the KIP. I just have one minor comment:
> Why do we need method
>
> public static <V> VersionedRecord<V> make(final V value, final long
> timestamp)
>
> in the VersionedRecord?
>
> The public constructor would do exactly the same, wouldn't it?
>
> I would make the constructor public and remove the static method make().
>
> Best,
> Bruno
>
> On 15.12.22 20:58, Victoria Xia wrote:
> > Thanks again for the great discussion, Sagar, Bruno, and Matthias. I've
> > just sent a message to start the vote on this KIP. Please have a look
> when
> > you get the chance.
> >
> > Thanks,
> > Victoria
> >
> > On Wed, Dec 14, 2022 at 12:28 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Thanks for clarifying about the null-question. SGTM.
> >>
> >> On 12/13/22 3:06 PM, Victoria Xia wrote:
> >>> Hi Matthias,
> >>>
> >>> Thanks for chiming in! Barring objections from anyone on this thread, I
> >>> will start the vote for this KIP on Thursday. That should be enough
> time
> >> to
> >>> incorporate any lingering minor changes.
> >>>
> >>>> I slightly prefer to add `VersionedRecord` interface (also
> >>> like the name). I agree that it's low overhead and providing a clean
> >>> path forward for future changes seems worth it to me.
> >>>
> >>> OK, that makes two of us. I updated the KIP just now to formally
> include
> >>> VersionedRecord as the new return type from the various
> >>> VersionedKeyValueStore methods.
> >>>
> >>>> if we introduce `VersionedRecord`, I think we can keep the not-null
> >>> requirement for `ValueAndTimestamp`
> >>>
> >>> Not quite. VersionedRecord is only used as a return type from read
> >> methods,
> >>> which is why VersionedRecord is able to enforce that its value is never
> >>> null. If the value being returned would have been null, then we return
> a
> >>> null VersionedRecord instead, rather than non-null VersionedRecord with
> >>> null value. So, there's no use case for a VersionedRecord with null
> >> value.
> >>>
> >>> In contrast, even though ValueAndTimestamp is not anywhere in the
> public
> >>> VersionedKeyValueStore interface, ValueAndTimestamp still needs to be
> >> used
> >>> internally when representing a versioned key-value store as a
> >>> TimestampedKeyValueStore, since TimestampedKeyValueStore is used
> >> everywhere
> >>> throughout the internals of the codebase. In order to represent a
> >> versioned
> >>> key-value store as a TimestampedKeyValueStore, we have to support
> `put(K
> >>> key, ValueAndTimestamp<V> value)`, which means ValueAndTimestamp needs
> to
> >>> support null value (with timestamp). Otherwise we cannot put a
> tombstone
> >>> into a versioned key-value store when using the internal
> >>> TimestampedKeyValueStore representation.
> >>>
> >>> It's very much an implementation detail that ValueAndTimestamp needs to
> >> be
> >>> relaxed to allow null values. I think this is a minor enough change
> that
> >> is
> >>> still preferable to the alternatives (refactoring the processors to not
> >>> require TimestampedKeyValueStore, or introducing a separate workaround
> >>> `put()` method on the TimestampedKeyValueStore representation of
> >> versioned
> >>> key-value stores), so I have left it in as part of the KIP.
> >>>
> >>> Best,
> >>> Victoria
> >>>
> >>> On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>
> >>>> Thanks Victoria.
> >>>>
> >>>> I did not re-read the KIP in full on the wiki but only your email.
> >>>>
> >>>> Points (1)-(8) SGTM.
> >>>>
> >>>> About (9): I slightly prefer to add `VersionedRecord` interface (also
> >>>> like the name). I agree that it's low overhead and providing a clean
> >>>> path forward for future changes seems worth it to me. Btw: if we
> >>>> introduce `VersionedRecord`, I think we can keep the not-null
> >>>> requirement for `ValueAndTimestamp` what seems a small side benefit.
> >>>> (Btw: your code snippet in the KIP shows what `VersionedRecord` would
> >>>> have a non-null requirement for the value, but I think it would need
> to
> >>>> allow null as value?)
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/7/22 5:23 PM, Victoria Xia wrote:
> >>>>> Thanks for the discussion, Bruno, Sagar, and Matthias!
> >>>>>
> >>>>> It seems we've reached consensus on almost all of the discussion
> >> points.
> >>>>> I've updated the KIP with the following:
> >>>>> 1) renamed "timestampTo" in `get(key, timestampTo)` to
> "asOfTimestamp"
> >> to
> >>>>> clarify that this timestamp bound is inclusive, per the SQL guideline
> >>>> that
> >>>>> "AS OF <timestamp>" queries are inclusive. In the future, if we want
> to
> >>>>> introduce a timestamp range query, we can use `get(key,
> timestampFrom,
> >>>>> timestampTo)` and specify that timestampTo is exclusive in this
> method,
> >>>>> while avoiding confusing with the inclusive asOfTimestamp parameter
> in
> >>>> the
> >>>>> other method, given that the names are different.
> >>>>> 2) added a description of "history retention" semantics into the
> >>>>> VersionedKeyValueStore interface Javadoc, and updated the Javadoc for
> >>>>> `get(key, asOfTimestamp)` to mention explicitly that a null result is
> >>>>> returned if the provided timestamp bound is not within history
> >> retention.
> >>>>> 3) added a `delete(key, timestamp)` method (with return type
> >>>>> `ValueAndTimestamp<V>`) to the VersionedKeyValueStore interface.
> >>>>> 4) updated the Javadoc for `segmentInterval` to clarify that the only
> >>>>> reason a user might be interested in this parameter is performance.
> >>>>>
> >>>>> Other points we discussed which did not result in updates include:
> >>>>> 5) whether to automatically update the `min.compaction.lag.ms`
> config
> >> on
> >>>>> changelog topics when history retention is changed -- there's support
> >> for
> >>>>> this but let's not bundle it with this KIP. We can have a separate
> KIP
> >> to
> >>>>> change this behavior for the existing windowed changelog topics, in
> >>>>> addition to versioned changelog topics.
> >>>>> 6) should we expose segmentInterval in this KIP -- let's go ahead and
> >>>>> expose it now since we'll almost certainly expose it (in this same
> >>>> manner)
> >>>>> in a follow-on KIP anyway, and so that poor performance for user
> >>>> workloads
> >>>>> is less likely to be a barrier for users getting started with this
> >>>> feature.
> >>>>> I updated the Javadoc for this parameter to clarify why the Javadoc
> >>>>> mentions performance despite Javadocs typically not doing so.
> >>>>> 7) `get(timestampFrom, timestampTo)` and other methods for IQ -- very
> >>>>> important but deferred to a future KIP
> >>>>> 8) `purge(key)`/`deleteAllVersions(key)` -- deferred to a future KIP
> >>>>>
> >>>>> That leaves only one unresolved discussion point:
> >>>>> 9) whether to include validTo in the return types from `get(...)`. If
> >> we
> >>>> go
> >>>>> with the current proposal of not including validTo in the return
> type,
> >>>> then
> >>>>> it will not be easy to add it in the future (unless we want to add
> >>>> validTo
> >>>>> to ValueAndTimestamp, which feels odd to me). If we think we might
> want
> >>>> to
> >>>>> have validTo in the future, we can change the return type of
> `get(...)`
> >>>> and
> >>>>> `delete(...)` in this proposal from `ValueAndTimestamp<V>` to a new
> >> type,
> >>>>> e.g., `VersionedRecord<V>` or `RecordVersion<V>`, which today will
> look
> >>>> the
> >>>>> same as `ValueAndTimestamp<V>` but in the future we can add validTo
> if
> >> we
> >>>>> want. The cost is a new type which today looks the same as
> >>>>> ValueAndTimestamp.
> >>>>>
> >>>>> Now that I think about it more, the cost to introducing a new type
> >> seems
> >>>>> relatively low. I've added a proposal towards the bottom of the KIP
> >> here
> >>>>> <
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores#KIP889:VersionedStateStores-Additionalreturntimestampsfromget(key,asOfTimestamp)
> >>>>> .
> >>>>> If others also believe that the cost of introducing this new
> interface
> >> is
> >>>>> low (particularly relative to the flexibility it provides us for
> being
> >>>> able
> >>>>> to evolve the class in the future), I will incorporate this proposal
> >> into
> >>>>> the KIP. I think the hardest part of this will be deciding on a name
> >> for
> >>>>> the new class :)
> >>>>>
> >>>>> Pending objections, I'd like to make a call on item (9) and call a
> vote
> >>>> on
> >>>>> this KIP at the end of this week.
> >>>>>
> >>>>> Thanks,
> >>>>> Victoria
> >>>>>
> >>>>> On Thu, Dec 1, 2022 at 9:47 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Thanks Victoria!
> >>>>>>
> >>>>>> (1) About `ReadOnlyVersionedKeyValueStore` -- I am not sure about
> IQv1
> >>>>>> vs IQv2. But you might be right that adding the interface later
> might
> >>>>>> not be an issue -- so it does not matter. Just wanted to double
> check.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> (2) About `delete(key, ts)` -- as already discussed, I agree that it
> >>>>>> should have same semantics as `put(key, null, ts)` (delete() needs a
> >>>>>> timestamp). Not sure if `delete()` really needs to return anything?
> I
> >>>>>> would be ok to make it `void` -- but I think it's also semantically
> >>>>>> sound if it returns the "old" value at timestamps `ts` that the
> delete
> >>>>>> actually deleted, as you mentioned -- in the end, an "delete" is a
> >>>>>> physical append anyway (ie, "soft delete") as we want to track
> >> history.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> (3)
> >>>>>>> Ah, great question. I think the question boils down to: do we want
> to
> >>>>>>> require that all versioned stores (including custom user
> >>>> implementations)
> >>>>>>> use "history retention" to determine when to expire old record
> >>>> versions?
> >>>>>>
> >>>>>> I personally think, yes. The main reason for this is, that I think
> we
> >>>>>> need to have a clear contract so we can plug-in custom
> implementations
> >>>>>> into the DSL later? -- I guess, having a stricter contract
> initially,
> >>>>>> and relaxing it later if necessary, is the easier was forward, than
> >> the
> >>>>>> other way around.
> >>>>>>
> >>>>>> For PAPI users, they are not bound to implement the interface anyway
> >> and
> >>>>>> can just add any store they like by extending the top level
> >> `StateStore`
> >>>>>> interface.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> (4) About `segmentInterval`: I am personally fine both ways. Seems
> >> it's
> >>>>>> your call to expose it or not. It seems there is a slight preference
> >> to
> >>>>>> expose it.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> (5) About `validTo`: based on my experience, it's usually simpler to
> >>>>>> have it exclusive. It's also how it's defined in "system versioned
> >>>>>> temporal tables" in the SQL standard, and how `AS OF <ts>` queries
> >> work.
> >>>>>>
> >>>>>> For a join, it of course implies that if a table record has
> [100,200)
> >> as
> >>>>>> inclusive `validFrom=100` and exclusive `validTo=200` it would only
> >> join
> >>>>>> with a stream-side record with 100 <= ts <= 199 (or 100 <= ts < 200
> >> :)).
> >>>>>>
> >>>>>> I would strongly advocate to make the upper bound exclusive (it did
> >>>>>> serve us well in the past to align to SQL semantics). It must be
> >> clearly
> >>>>>> documented of course and we can also name variable accordingly if
> >>>>>> necessary.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> (6) About including `validTo` in return types -- it's not easy to
> >> change
> >>>>>> the return type, because the signature of a method is only
> determined
> >> by
> >>>>>> it's name in input parameter types, ie, we cannot overload an
> existing
> >>>>>> method to just change the return type, but would need to change its
> >> name
> >>>>>> or parameter list... Not sure if we can or cannot add `validTo` to
> >>>>>> `ValueAndTimestamp` though, but it's a tricky question. Would be
> good
> >> to
> >>>>>> get some more input from other if we think that it would be
> important
> >>>>>> enough to worry about it now or not.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> (7) About `get(k)` vs `get(k, ts)` vs `getAsOf(k, ts)`: I would
> prefer
> >>>>>> to just keep `get()` with two overloads and not add `getAsOf()`; the
> >>>>>> fact that we pass in a timestamp implies we have a point in time
> >> query.
> >>>>>> (It's cleaner API design to leverage method overloads IMHO, and it's
> >>>>>> what we did in the past). Of course, we can name the parameter
> >> `get(key,
> >>>>>> asOfTimestamp)` if we think it's helpful. And in alignment to have
> >>>>>> `validTo` exclusive, `validTo` would be `asOfTimestampe+1` (or
> >> larger),
> >>>>>> in case we return it.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> (8) About updating topic config (ie, history retention and
> compaction
> >>>>>> lag): It think it was actually some oversight to not update topic
> >>>>>> configs if the code changes. There is actually a Jira ticket about
> >> it. I
> >>>>>> would prefer to keep the behavior consistent though and not change
> it
> >>>>>> just for the new versioned-store, but change it globally in one shot
> >>>>>> independent of this KIP.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 12/1/22 10:15 AM, Sagar wrote:
> >>>>>>> Thanks Victoria,
> >>>>>>>
> >>>>>>> I guess an advantage of exposing a method like delete(key,
> timestamp)
> >>>>>> could
> >>>>>>> be that from a user's standpoint, it is a single operation and not
> 2.
> >>>> The
> >>>>>>> equivalent of this method i.e put followed by get is not atomic so
> >>>>>> exposing
> >>>>>>> it certainly sounds like a good idea.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia
> >>>>>>> <victoria....@confluent.io.invalid> wrote:
> >>>>>>>
> >>>>>>>> Thanks, Sagar and Bruno, for your insights and comments!
> >>>>>>>>
> >>>>>>>>> Sagar: Can we name according to the semantics that you want to
> >>>>>>>> support like `getAsOf` or something like that? I am not sure if we
> >> do
> >>>>>> that
> >>>>>>>> in our codebase though. Maybe the experts can chime in.
> >>>>>>>>
> >>>>>>>> Because it is a new method that will be added, we should be able
> to
> >>>>>> name it
> >>>>>>>> whatever we like. I agree `getAsOf` is more clear, albeit wordier.
> >>>>>>>> Introducing `getAsOf(key, timestamp)` means we could leave open
> >>>>>> `get(key,
> >>>>>>>> timeFrom, timeTo)` to have an exclusive `timeTo` without
> >> introducing a
> >>>>>>>> collision. (We could introduce `getBetween(key, timeFrom, timeTo)`
> >>>>>> instead
> >>>>>>>> to delineate even more clearly, though this is better left for a
> >>>> future
> >>>>>>>> KIP.)
> >>>>>>>>
> >>>>>>>> I don't think there's any existing precedent in codebase to follow
> >>>> here
> >>>>>> but
> >>>>>>>> I'll leave that to the experts. Curious to hear what others prefer
> >> as
> >>>>>> well.
> >>>>>>>>
> >>>>>>>>> Sagar: With delete, we would stlll keep the older versions of the
> >> key
> >>>>>>>> right?
> >>>>>>>>
> >>>>>>>> We could certainly choose this for the semantics of delete(...) --
> >> and
> >>>>>> it
> >>>>>>>> sounds like we should too, based on Bruno's confirmation below
> that
> >>>> this
> >>>>>>>> feels more natural to him as well -- but as Bruno noted in his
> >> message
> >>>>>>>> below I think we'll want the method signature to be `delete(key,
> >>>>>>>> timestamp)` then, so that there is an explicit timestamp to
> >> associate
> >>>>>> with
> >>>>>>>> the deletion. In other words, `delete(key, timestamp)` has the
> same
> >>>>>> effect
> >>>>>>>> as `put(key, null, timestamp)`. The only difference is that the
> >>>>>> `put(...)`
> >>>>>>>> method has a `void` return type, while `delete(key, timestamp)`
> can
> >>>> have
> >>>>>>>> `ValueAndTimestamp` as return type in order to return the record
> >> which
> >>>>>> is
> >>>>>>>> replaced (if any). In other words, `delete(key, timestamp)` is
> >>>>>> equivalent
> >>>>>>>> to `put(key, null, timestamp)` followed by `get(key, timestamp)`.
> >>>>>>>>
> >>>>>>>>> Bruno: I would also not change the semantics so that it deletes
> all
> >>>>>>>> versions of
> >>>>>>>> a key. I would rather add a new method purge(key) or
> >>>>>>>> deleteAllVersions(key) or similar if we want to have such a method
> >> in
> >>>>>>>> this first KIP.
> >>>>>>>>
> >>>>>>>> Makes sense; I'm convinced. Let's defer
> >>>>>>>> `purge(key)`/`deleteAllVersions(key)` to a future KIP. If there's
> >>>>>> agreement
> >>>>>>>> that `delete(key, timestamp)` (as described above) is valuable, we
> >> can
> >>>>>> keep
> >>>>>>>> it in this first KIP even though it is syntactic sugar. If this
> >> turns
> >>>>>> into
> >>>>>>>> a larger discussion, we can defer this to a future KIP as well.
> >>>>>>>>
> >>>>>>>>> Bruno: I would treat the history retention as a strict limit.
> [...]
> >>>> You
> >>>>>>>> could also add historyRetentionMs() to the
> VersionedKeyValueStore<K,
> >>>> V>
> >>>>>>>> interface to make the concept of the history retention part of the
> >>>>>>>> interface.
> >>>>>>>>
> >>>>>>>> OK. That's the second vote for rewording the javadoc for
> >>>>>>>> `VersionedKeyValueStore#get(key, timestampTo)` to remove the
> >>>>>> parenthetical
> >>>>>>>> and clarify that history retention should be used to dictate this
> >>>> case,
> >>>>>> so
> >>>>>>>> I'll go ahead and do that. I'll leave out adding
> >>>> `historyRetentionMs()`
> >>>>>> to
> >>>>>>>> the interface for now, though, for the sake of consistency with
> >> other
> >>>>>>>> stores (e.g., window stores) which don't expose similar types of
> >>>>>>>> configurations from their interfaces.
> >>>>>>>>
> >>>>>>>>> Bruno: exclusive vs inclusive regarding validTo timestamp in
> get().
> >>>>>>>> Doesn't this decision depend on the semantics of the join for
> which
> >>>> this
> >>>>>>>> state store should be used?
> >>>>>>>>
> >>>>>>>> Yes, you are correct. As a user I would expect that a stream-side
> >>>> record
> >>>>>>>> with the same timestamp as a table-side record _would_ produce a
> >> join
> >>>>>>>> result, which is consistent with the proposal for timestampTo to
> be
> >>>>>>>> inclusive. (FWIW I tried this out with a Flink temporal join just
> >> now
> >>>>>> and
> >>>>>>>> observed this result as well. Not sure where to look for other
> >>>>>> standards to
> >>>>>>>> validate this expectation.)
> >>>>>>>>
> >>>>>>>>> Bruno: If Streams does not update min.compaction.lag.ms during
> >>>>>>>> rebalances,
> >>>>>>>> users have to do it each time they change history retention in the
> >>>> code,
> >>>>>>>> right? That seems odd to me. What is the actual reason for not
> >>>> updating
> >>>>>>>> the config? How does Streams handle updates to windowed stores?
> >>>>>>>>
> >>>>>>>> Yes, users will have to update min.compaction.lag.ms for the
> >>>> changelog
> >>>>>>>> topic themselves if they update history retention in their code.
> >> This
> >>>> is
> >>>>>>>> consistent with what happens for window stores today: e.g., if a
> >> user
> >>>>>>>> updates grace period for a windowed aggregation, then they are
> >>>>>> responsible
> >>>>>>>> for updating retention.ms on their windowed changelog topic as
> >> well.
> >>>>>>>>
> >>>>>>>> I'm not familiar with the historical context around why this is
> the
> >>>>>> case --
> >>>>>>>> Matthias, do you know?
> >>>>>>>>
> >>>>>>>> My best guess is that Streams does not want to interfere with any
> >>>>>> potential
> >>>>>>>> out-of-band changes by the user between application restarts,
> though
> >>>> I'm
> >>>>>>>> not sure why a user would want to change this specific config to a
> >>>> value
> >>>>>>>> which does not accord with the specified history retention. I
> notice
> >>>>>> that
> >>>>>>>> there is code for validating topic configs and collecting
> validation
> >>>>>> errors
> >>>>>>>> (
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L318-L319
> >>>>>>>> )
> >>>>>>>> but this method is not called from anywhere, even though there are
> >>>> unit
> >>>>>>>> tests for it. I was unable to find history of this validation
> after
> >> a
> >>>>>> quick
> >>>>>>>> search. Hopefully Matthias (or others) has context, otherwise I
> will
> >>>>>> have a
> >>>>>>>> closer look.
> >>>>>>>>
> >>>>>>>> - Victoria
> >>>>>>>>
> >>>>>>>> On Wed, Nov 23, 2022 at 8:52 AM Bruno Cadonna <cado...@apache.org
> >
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> Thanks for the KIP, Victoria!
> >>>>>>>>>
> >>>>>>>>> I have a couple of comments.
> >>>>>>>>>
> >>>>>>>>> 1. delete(key)
> >>>>>>>>> I think delete(key) should not remove all versions of a key. We
> >>>> should
> >>>>>>>>> use it to close the validity interval of the last version.
> >>>>>>>>> Assuming we have records of different versions for key A:
> >>>>>>>>> (A, e, 0, 2),
> >>>>>>>>> (A, f, 2, 3),
> >>>>>>>>> (A, g, 3, MAX)
> >>>>>>>>>
> >>>>>>>>> delete(A) would update them to
> >>>>>>>>>
> >>>>>>>>> (A, e, 0, 2),
> >>>>>>>>> (A, f, 2, 3),
> >>>>>>>>> (A, g, 3, 5)
> >>>>>>>>> (A, null, 5, MAX)
> >>>>>>>>>
> >>>>>>>>> But then the question arises where does timestamp 5 that closes
> the
> >>>>>>>>> interval in (A, g, 3, 5) and opens the interval in (A, null, 5,
> >> MAX)
> >>>>>>>>> come from. We could use the timestamp at which delete(A) is
> called,
> >>>> but
> >>>>>>>>> actually I do not like that because it seems to me it opens the
> >> doors
> >>>>>> to
> >>>>>>>>> non-determinism. If we use event time for put() we should also
> use
> >> it
> >>>>>>>>> for delete(). Actually, put(A, null, 5) would have the same
> effect
> >> as
> >>>>>>>>> delete(A) in the example above. As a syntactical sugar, we could
> >> add
> >>>>>>>>> delete(key, validFrom). (I just realized now that I just repeated
> >>>> what
> >>>>>>>>> Victoria said in her previous e-mail.)
> >>>>>>>>> I agree with Victoria that delete(A) as defined for other state
> >>>> stores
> >>>>>>>>> is hard to re-use in the versioned key-value store.
> >>>>>>>>> I would also not change the semantics so that it deletes all
> >> versions
> >>>>>> of
> >>>>>>>>> a key. I would rather add a new method purge(key) or
> >>>>>>>>> deleteAllVersions(key) or similar if we want to have such a
> method
> >> in
> >>>>>>>>> this first KIP.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 2. history retention
> >>>>>>>>> I would remove "(up to store implementation discretion when this
> is
> >>>> the
> >>>>>>>>> case)". I would treat the history retention as a strict limit. If
> >>>> users
> >>>>>>>>> want to implement a less strict behavior, they can still do it.
> >> Maybe
> >>>>>>>>> mention in the javadocs the implications of not adhering strictly
> >> to
> >>>>>> the
> >>>>>>>>> history retention. That is, the DSL might become
> non-deterministic.
> >>>> You
> >>>>>>>>> could also add historyRetentionMs() to the
> >> VersionedKeyValueStore<K,
> >>>> V>
> >>>>>>>>> interface to make the concept of the history retention part of
> the
> >>>>>>>>> interface.
> >>>>>>>>>
> >>>>>>>>> 3. null vs. exception for out-of-bound queries
> >>>>>>>>> I am in favor of null. The record version is not there anymore
> >>>> because
> >>>>>>>>> it expired. This seems to me normal and nothing exceptional. That
> >>>> would
> >>>>>>>>> also consistent with the behavior of other APIs as already
> >> mentioned.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 4. Exposing segmentInterval
> >>>>>>>>> Since we have evidence that the segment interval affects
> >>>> performance, I
> >>>>>>>>> would expose it. But I find it also OK to expose it once we have
> a
> >>>>>>>>> corresponding metric.
> >>>>>>>>>
> >>>>>>>>> 5. exclusive vs inclusive regarding validTo timestamp in get()
> >>>>>>>>> Doesn't this decision depend on the semantics of the join for
> which
> >>>>>> this
> >>>>>>>>> state store should be used? Should a record on the table side
> that
> >>>> has
> >>>>>>>>> the same timestamp as the record on the stream side join? Or
> should
> >>>>>> only
> >>>>>>>>> records in the table that are strictly before the record on the
> >>>> stream
> >>>>>>>>> side join?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 6. Not setting min.compaction.lag.ms during rebalances
> >>>>>>>>> If Streams does not update min.compaction.lag.ms during
> >> rebalances,
> >>>>>>>>> users have to do it each time they change history retention in
> the
> >>>>>> code,
> >>>>>>>>> right? That seems odd to me. What is the actual reason for not
> >>>> updating
> >>>>>>>>> the config? How does Streams handle updates to windowed stores?
> >> That
> >>>>>>>>> should be a similar situation for the retention time config of
> the
> >>>>>>>>> changelog topic.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 23.11.22 09:11, Sagar wrote:
> >>>>>>>>>> Hi Vicky,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for your response!
> >>>>>>>>>>
> >>>>>>>>>> I would just use numbers to refer to your comments.
> >>>>>>>>>>
> >>>>>>>>>> 1) Thanks for your response. Even I am not totally sure whether
> >>>> these
> >>>>>>>>>> should be supported via IQv2 or via store interface. That said,
> I
> >>>>>>>>> wouldn't
> >>>>>>>>>> definitely qualify this as  blocking the KIP for sure so we can
> >> live
> >>>>>>>>>> without it :)
> >>>>>>>>>>
> >>>>>>>>>> 2) Yeah if the 2 APIs for get have different semantics for
> >>>>>> timestampTo,
> >>>>>>>>>> then it could be confusing. I went through the link for temporal
> >>>>>> tables
> >>>>>>>>>> (TFS!) and I now get why the AS OF semantics would have it
> >>>> inclusive.
> >>>>>> I
> >>>>>>>>>> think part of the problem is that the name get on it's own is
> not
> >> as
> >>>>>>>>>> expressive as SQL. Can we name according to the semantics that
> you
> >>>>>> want
> >>>>>>>>> to
> >>>>>>>>>> support like `getAsOf` or something like that? I am not sure if
> we
> >>>> do
> >>>>>>>>> that
> >>>>>>>>>> in our codebase though. Maybe the experts can chime in.
> >>>>>>>>>>
> >>>>>>>>>> 3) hmm I would have named it `validUpto` But again not very
> picky
> >>>>>> about
> >>>>>>>>> it.
> >>>>>>>>>> After going through the link and your KIP, it's a lot clearer to
> >> me.
> >>>>>>>>>>
> >>>>>>>>>> 4) I think delete(key) should be sufficient. With delete, we
> would
> >>>>>>>>>> stlll keep the older versions of the key right?
> >>>>>>>>>>
> >>>>>>>>>> Thanks!
> >>>>>>>>>> Sagar.
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Nov 23, 2022 at 12:17 AM Victoria Xia
> >>>>>>>>>> <victoria....@confluent.io.invalid> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks, Matthias and Sagar, for your comments! I've responded
> >> here
> >>>>>> for
> >>>>>>>>> now,
> >>>>>>>>>>> and will update the KIP afterwards with the outcome of our
> >>>>>> discussions
> >>>>>>>>> as
> >>>>>>>>>>> they resolve.
> >>>>>>>>>>>
> >>>>>>>>>>> ----------- Matthias's comments -----------
> >>>>>>>>>>>
> >>>>>>>>>>>> (1) Why does the new store not extend KeyValueStore, but
> >>>> StateStore?
> >>>>>>>>>>> In the end, it's a KeyValueStore?
> >>>>>>>>>>>
> >>>>>>>>>>> A `VersionedKeyValueStore<K, V>` is not a `KeyValueStore<K, V>`
> >>>>>>>> because
> >>>>>>>>>>> many of the KeyValueStore methods would not make sense for a
> >>>>>> versioned
> >>>>>>>>>>> store. For example, `put(K key, V value)` is not meaningful
> for a
> >>>>>>>>> versioned
> >>>>>>>>>>> store because the record needs a timestamp associated with it.
> >>>>>>>>>>>
> >>>>>>>>>>> A `VersionedKeyValueStore<K, V>` is more similar to a
> >>>>>>>> `KeyValueStore<K,
> >>>>>>>>>>> ValueAndTimestamp<V>>` (i.e., `TimestampedKeyValueStore<K,
> V>`),
> >>>> but
> >>>>>>>>> some
> >>>>>>>>>>> of the TimestampedKeyValueStore methods are still problematic.
> >> For
> >>>>>>>>> example,
> >>>>>>>>>>> what does it mean for `delete(K key)` to have return type
> >>>>>>>>>>> `ValueAndTimestamp<V>`? Does this mean that `delete(K key)`
> only
> >>>>>>>> deletes
> >>>>>>>>>>> (and returns) the latest record version for the key? Probably
> we
> >>>> want
> >>>>>>>> a
> >>>>>>>>>>> versioned store to have `delete(K key)` delete all record
> >> versions
> >>>>>> for
> >>>>>>>>> the
> >>>>>>>>>>> given key, in which case the return type is better suited as an
> >>>>>>>>>>> iterator/collection of KeyValueTimestamp. `putIfAbsent(K key,
> >>>>>>>>>>> ValueAndTimestamp value)` also has ambiguous semantics for
> >>>> versioned
> >>>>>>>>> stores
> >>>>>>>>>>> (i.e., what does it mean for the key/record to be "absent").
> >>>>>>>>>>>
> >>>>>>>>>>> I agree that conceptually a versioned key-value store is just a
> >>>>>>>>> key-value
> >>>>>>>>>>> store, though. In the future if we redesign the store
> interfaces,
> >>>>>> it'd
> >>>>>>>>> be
> >>>>>>>>>>> great to unify them by having a more generic KeyValueStore
> >>>> interface
> >>>>>>>>> that
> >>>>>>>>>>> allows for extra flexibility to support different types of
> >>>> key-value
> >>>>>>>>>>> stores, including versioned stores. (Or, if you can think of a
> >> way
> >>>> to
> >>>>>>>>>>> achieve this with the existing interfaces today, I'm all ears!)
> >>>>>>>>>>>
> >>>>>>>>>>>> (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if
> we
> >>>>>> don't
> >>>>>>>>>>> want to support IQ in this KIP, it might be good to add this
> >>>>>> interface
> >>>>>>>>>>> right away to avoid complications for follow up KIPs? Or won't
> >>>> there
> >>>>>>>> by
> >>>>>>>>>>> any complications anyway?
> >>>>>>>>>>>
> >>>>>>>>>>> I don't think there will be complications for refactoring to
> add
> >>>> this
> >>>>>>>>>>> interface in the future. Refactoring out
> >>>>>>>> ReadOnlyVersionedKeyValueStore
> >>>>>>>>>>> from VersionedKeyValueStore would leave VersionedKeyValueStore
> >>>>>>>> unchanged
> >>>>>>>>>>> from the outside.
> >>>>>>>>>>>
> >>>>>>>>>>> Also, is it true that the ReadOnlyKeyValueStore interface is
> only
> >>>>>> used
> >>>>>>>>> for
> >>>>>>>>>>> IQv1 and not IQv2? I think it's an open question as to whether
> we
> >>>>>>>> should
> >>>>>>>>>>> support IQv1 for versioned stores or only IQv2. If the latter,
> >> then
> >>>>>>>>> maybe
> >>>>>>>>>>> we won't need the extra interface at all.
> >>>>>>>>>>>
> >>>>>>>>>>>> (3) Why do we not have a `delete(key)` method? I am ok with
> not
> >>>>>>>>>>> supporting all methods from existing KV-store, but a
> >> `delete(key)`
> >>>>>>>> seems
> >>>>>>>>>>> to be fundamentally to have?
> >>>>>>>>>>>
> >>>>>>>>>>> What do you think the semantics of `delete(key)` should be for
> >>>>>>>> versioned
> >>>>>>>>>>> stores? Should `delete(key)` delete (and return) all record
> >>>> versions
> >>>>>>>> for
> >>>>>>>>>>> the key? Or should we have `delete(key, timestamp)` which is
> >>>>>>>> equivalent
> >>>>>>>>> to
> >>>>>>>>>>> `put(key, null, timestamp)` except with a return type to return
> >>>>>>>>>>> ValueAndTimestamp representing the record it replaced?
> >>>>>>>>>>>
> >>>>>>>>>>> If we have ready alignment on what the interface and semantics
> >> for
> >>>>>>>>>>> `delete(key)` should be, then adding it in this KIP sounds
> good.
> >> I
> >>>>>>>> just
> >>>>>>>>>>> didn't want the rest of the KIP to be hung up over additional
> >>>>>>>>> interfaces,
> >>>>>>>>>>> given that we can always add extra interfaces in the future.
> >>>>>>>>>>>
> >>>>>>>>>>>> (4a) Do we need `get(key)`? It seems to be the same as
> `get(key,
> >>>>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though?
> >> Just
> >>>>>> for
> >>>>>>>>>>> my own clarification (should we add something to the
> JavaDocs?).
> >>>>>>>>>>>
> >>>>>>>>>>> Correct, it is just syntactic sugar. I will add a clarification
> >>>> into
> >>>>>>>> the
> >>>>>>>>>>> Javadocs as you've suggested.
> >>>>>>>>>>>
> >>>>>>>>>>>> (4b) Should we throw an exception if a user queries
> out-of-bound
> >>>>>>>>>>> instead of returning `null` (in `get(key,ts)`)?
> >>>>>>>>>>>         -> You put it into "rejected alternatives", and I
> >> understand
> >>>>>> your
> >>>>>>>>>>> argument. Would love to get input from others about this
> question
> >>>>>>>>>>> though. -- It seems we also return `null` for windowed stores,
> so
> >>>>>>>> maybe
> >>>>>>>>>>> the strongest argument is to align to existing behavior? Or do
> we
> >>>>>> have
> >>>>>>>>>>> case for which the current behavior is problematic?
> >>>>>>>>>>>
> >>>>>>>>>>> Sure; curious to hear what others think as well.
> >>>>>>>>>>>
> >>>>>>>>>>>> (4c) JavaDoc on `get(key,ts)` says: "(up to store
> implementation
> >>>>>>>>>>> discretion when this is the case)" -> Should we make it a
> >> stricter
> >>>>>>>>>>> contract such that the user can reason about it better (there
> is
> >>>> WIP
> >>>>>>>> to
> >>>>>>>>>>> make retention time a strict bound for windowed stores atm)
> >>>>>>>>>>>         -> JavaDocs on `persistentVersionedKeyValueStore`
> seems to
> >>>>>>>> suggest a
> >>>>>>>>>>> strict bound, too.
> >>>>>>>>>>>
> >>>>>>>>>>> Ah, great question. I think the question boils down to: do we
> >> want
> >>>> to
> >>>>>>>>>>> require that all versioned stores (including custom user
> >>>>>>>>> implementations)
> >>>>>>>>>>> use "history retention" to determine when to expire old record
> >>>>>>>> versions?
> >>>>>>>>>>>
> >>>>>>>>>>> Because the `persistentVersionedKeyValueStore(...)` method
> >> returns
> >>>>>>>>>>> instances of the provided RocksDB-based versioned store
> >>>>>>>> implementation,
> >>>>>>>>>>> which does use history retention for this purpose, that's why
> we
> >>>> can
> >>>>>>>>> very
> >>>>>>>>>>> clearly say that for this store, `get(key, ts)` will return
> null
> >> if
> >>>>>>>> the
> >>>>>>>>>>> provided timestamp bound has fallen out of history retention.
> The
> >>>>>>>>> reason I
> >>>>>>>>>>> left the `VersionedKeyValueStore#get(key, ts)` Javadoc more
> >> generic
> >>>>>>>>> (i.e.,
> >>>>>>>>>>> does not mention history retention) is because maybe a user
> >>>>>>>> implementing
> >>>>>>>>>>> their own custom store will choose a different expiry
> mechanism,
> >>>>>> e.g.,
> >>>>>>>>> keep
> >>>>>>>>>>> the three latest versions for each key regardless of how old
> the
> >>>>>>>>> timestamps
> >>>>>>>>>>> are.
> >>>>>>>>>>>
> >>>>>>>>>>> If we want to require that all versioned stores use history
> >>>> retention
> >>>>>>>> in
> >>>>>>>>>>> order to determine when to expire old records, then I will
> >>>> certainly
> >>>>>>>>> update
> >>>>>>>>>>> the Javadoc to clarify. This is already a requirement for DSL
> >> users
> >>>>>>>>> because
> >>>>>>>>>>> the VersionedBytesStoreSupplier interface requires history
> >>>> retention
> >>>>>>>> to
> >>>>>>>>> be
> >>>>>>>>>>> provided (in order for changelog topic configs to be properly
> >> set),
> >>>>>> so
> >>>>>>>>> it's
> >>>>>>>>>>> just a question of whether we also want to require PAPI users
> to
> >>>> use
> >>>>>>>>>>> history retention too. I had a look at the existing window
> stores
> >>>> and
> >>>>>>>>>>> didn't see precedent for requiring all window stores have a
> >>>> standard
> >>>>>>>>>>> "retention time" concept for how long to keep windows, but if
> we
> >>>> want
> >>>>>>>> to
> >>>>>>>>>>> have a standard "history retention" concept for versioned
> stores
> >> we
> >>>>>>>>>>> certainly can. WDYT?
> >>>>>>>>>>>
> >>>>>>>>>>>> (5a) Do we need to expose `segmentInterval`? For
> >> windowed-stores,
> >>>> we
> >>>>>>>>>>> also use segments but hard-code it to two (it was exposed in
> >>>> earlier
> >>>>>>>>>>> versions but it seems not useful, even if we would be open to
> >>>> expose
> >>>>>>>> it
> >>>>>>>>>>> again if there is user demand).
> >>>>>>>>>>>
> >>>>>>>>>>> If we want to leave it out of this first KIP (and potentially
> >>>> expose
> >>>>>>>> it
> >>>>>>>>> in
> >>>>>>>>>>> the future), that works for me. The performance benchmarks I
> ran
> >>>>>>>> suggest
> >>>>>>>>>>> that this parameter greatly impacts store performance though
> and
> >> is
> >>>>>>>> very
> >>>>>>>>>>> workload dependent. If a user reported poor performance using
> >>>>>>>> versioned
> >>>>>>>>>>> stores for their workload, this is the first parameter I would
> >> want
> >>>>>> to
> >>>>>>>>>>> tune. That said, metrics/observability for versioned stores
> >> (which
> >>>>>>>>> would be
> >>>>>>>>>>> helpful for determining how this parameter should be adjusted)
> >> have
> >>>>>>>> been
> >>>>>>>>>>> deferred to a follow-up KIP, so perhaps that's reason to defer
> >>>>>>>> exposing
> >>>>>>>>>>> this parameter as well.
> >>>>>>>>>>>
> >>>>>>>>>>>> (5b) JavaDocs says: "Performance degrades as more record
> >> versions
> >>>>>> for
> >>>>>>>>>>> the same key are collected in a single segment. On the other
> >> hand,
> >>>>>>>>>>> out-of-order writes and reads which access older segments may
> >> slow
> >>>>>>>> down
> >>>>>>>>>>> if there are too many segments." -- Wondering if JavaDocs
> should
> >>>> make
> >>>>>>>>>>> any statements about expected performance? Seems to be an
> >>>>>>>> implementation
> >>>>>>>>>>> detail?
> >>>>>>>>>>>
> >>>>>>>>>>> I included this sentence to explain why a user might want to
> tune
> >>>>>> this
> >>>>>>>>>>> value / help guide how to think about the parameter, but if we
> >> want
> >>>>>> to
> >>>>>>>>>>> remove it entirely (per the discussion point above) then this
> >>>> Javadoc
> >>>>>>>>> will
> >>>>>>>>>>> be removed with it.
> >>>>>>>>>>>
> >>>>>>>>>>>> (6) validTo timestamp is "exclusive", right? Ie, if I query
> >>>>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next"
> record
> >>>> v2
> >>>>>>>>>>> with validFromV2=ts?
> >>>>>>>>>>>
> >>>>>>>>>>> I actually intended for it to be inclusive (will update the
> KIP).
> >>>> Do
> >>>>>>>> you
> >>>>>>>>>>> think exclusive is more intuitive? The reason I had inclusive
> in
> >> my
> >>>>>>>>> mind is
> >>>>>>>>>>> because it's like a "AS OF <time>" query, which treats the time
> >>>> bound
> >>>>>>>> as
> >>>>>>>>>>> inclusive.
> >>>>>>>>>>>
> >>>>>>>>>>>> (7) The KIP says, that segments are stores in the same RocksDB
> >> --
> >>>>>> for
> >>>>>>>>>>> this case, how are efficient deletes handled? For
> windowed-store,
> >>>> we
> >>>>>>>> can
> >>>>>>>>>>> just delete a full RocksDB.
> >>>>>>>>>>>
> >>>>>>>>>>> The way that multiple segments are represented in the same
> >> RocksDB
> >>>> is
> >>>>>>>>> that
> >>>>>>>>>>> the RocksDB keys are prefixed with segment ID. An entire
> segment
> >> is
> >>>>>>>>> deleted
> >>>>>>>>>>> with a single `deleteRange()` call to RocksDB.
> >>>>>>>>>>>
> >>>>>>>>>>>> (8) Rejected alternatives: you propose to not return the
> validTo
> >>>>>>>>>>> timestamp -- if we find it useful in the future to return it,
> >> would
> >>>>>>>>>>> there be a clean path to change it accordingly?
> >>>>>>>>>>>
> >>>>>>>>>>> With the current proposal, there's no clean path. If we think
> >>>> there's
> >>>>>>>> a
> >>>>>>>>>>> good chance we might want to do this in the future, then we
> >> should
> >>>>>>>>> update
> >>>>>>>>>>> the proposed interfaces.
> >>>>>>>>>>>
> >>>>>>>>>>> The current proposed return type from
> `VersionedKeyValueStore<K,
> >>>>>>>>>>> V>#get(key, tsTo)` is `ValueAndTimestamp<V>`. There's no way to
> >>>> add a
> >>>>>>>>>>> second timestamp into `ValueAndTimestamp<V>`, which is why
> >> there's
> >>>> no
> >>>>>>>>> clean
> >>>>>>>>>>> path to include validTo timestamp in the future under the
> >> existing
> >>>>>>>>>>> proposal.
> >>>>>>>>>>>
> >>>>>>>>>>> If we wanted to allow for including validTo timestamp in the
> >>>> future,
> >>>>>>>>> we'd
> >>>>>>>>>>> instead update the return type to be a new `VersionedRecord<V>`
> >>>>>>>> object.
> >>>>>>>>>>> Today a `VersionedRecord<V>` could just include `value` and
> >>>>>>>> `timestamp`,
> >>>>>>>>>>> and in the future we could add `validTo` (names subject to
> >> change)
> >>>>>>>> into
> >>>>>>>>> the
> >>>>>>>>>>> `VersionedRecord` as well. (It'd look a little strange for now
> >>>> since
> >>>>>>>>>>> VersionedRecord is the same as ValueAndTimestamp, but that
> seems
> >>>>>>>> fine.)
> >>>>>>>>>>>
> >>>>>>>>>>> If we choose to do this, I think we should also update the
> return
> >>>>>> type
> >>>>>>>>> of
> >>>>>>>>>>> `VersionedKeyValueStore#get(key)` to be VersionedRecord as
> well,
> >>>>>>>> rather
> >>>>>>>>>>> than having one return TimestampAndValue while the other
> returns
> >>>>>>>>>>> VersionedRecord.
> >>>>>>>>>>>
> >>>>>>>>>>> ----------- Sagar's comments -----------
> >>>>>>>>>>>
> >>>>>>>>>>>> 1) Did you consider adding a method similar to :
> >>>>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)?
> >>>>>>>>>>> I think this could be useful considering that this
> >>>>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT?
> >>>>>>>>>>>
> >>>>>>>>>>> Yes, I do think this method is valuable. I think we will
> >> definitely
> >>>>>>>>> want to
> >>>>>>>>>>> support time-range based queries at some point (hopefully
> soon),
> >>>> and
> >>>>>>>>> likely
> >>>>>>>>>>> also key-range based queries (to achieve feature parity with
> >>>> existing
> >>>>>>>>>>> key-value stores).
> >>>>>>>>>>>
> >>>>>>>>>>> It's not immediately clear to me whether these types of queries
> >>>>>> should
> >>>>>>>>> be
> >>>>>>>>>>> supported as part of the store interface or if they should only
> >> be
> >>>>>>>>>>> supported via the `query(...)` method for IQv2. (It's an open
> >>>>>> question
> >>>>>>>>> as
> >>>>>>>>>>> to whether we should support IQv1 for versioned stores or only
> >>>> IQv2.
> >>>>>> A
> >>>>>>>>>>> benefit of IQv2 over IQv1 is that we won't need to add
> individual
> >>>>>>>> store
> >>>>>>>>>>> methods for each type of query, including for all wrapped store
> >>>>>>>> layers.)
> >>>>>>>>>>>
> >>>>>>>>>>> If we have clear non-IQ use cases for these methods (e.g., use
> >>>> cases
> >>>>>>>>> within
> >>>>>>>>>>> processors), then they'll need to be added as part of the store
> >>>>>>>>> interface
> >>>>>>>>>>> for sure. I'm leaning towards adding them as part of the store
> >>>>>>>> interface
> >>>>>>>>>>> but given the ambiguity here, it may be preferrable to defer
> to a
> >>>>>>>>> follow-up
> >>>>>>>>>>> KIP. OTOH, if you think the versioned store interface as
> proposed
> >>>> in
> >>>>>>>>> this
> >>>>>>>>>>> KIP is too bare bones to be useful, I'm open to adding it in
> now
> >> as
> >>>>>>>>> well.
> >>>>>>>>>>>
> >>>>>>>>>>>> 2) I have a similar question as Matthias, about the
> timestampTo
> >>>>>>>>> argument
> >>>>>>>>>>> when doing a get. Is it inclusive or exclusive?
> >>>>>>>>>>>
> >>>>>>>>>>> Same answer (and follow-up question) as above. Do you think it
> >> will
> >>>>>> be
> >>>>>>>>>>> confusing for `get(key, tsTo)` to use an inclusive time bound,
> >>>> while
> >>>>>>>>>>> `get(key, tsFrom, tsTo)` would use an exclusive tsTo time
> bound?
> >>>>>> Maybe
> >>>>>>>>> we
> >>>>>>>>>>> should rename `get(key, tsFrom, tsTo)` to `getVersions(...)` or
> >>>>>>>>>>> `getRange(...)` in order to avoid confusion.
> >>>>>>>>>>>
> >>>>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is
> essentially
> >>>> the
> >>>>>>>>>>> timestamp at which the record was inserted. validFrom makes it
> >>>> sound
> >>>>>>>>> like
> >>>>>>>>>>> validTo which can keep changing based on new records while
> *from*
> >>>> is
> >>>>>>>>> fixed.
> >>>>>>>>>>> WDYT?
> >>>>>>>>>>>
> >>>>>>>>>>> "It is essentially the timestamp at which the record was
> >> inserted"
> >>>>>> <--
> >>>>>>>>> Yes,
> >>>>>>>>>>> that's correct.
> >>>>>>>>>>>
> >>>>>>>>>>> I borrowed the "validFrom/validTo" terminology from temporal
> >>>> tables,
> >>>>>>>>> e.g.,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver16
> >>>>>>>>>>> .
> >>>>>>>>>>> I don't believe the terms "validFrom" or "validTo" are
> currently
> >>>>>>>> exposed
> >>>>>>>>>>> anywhere in any of the user-facing interfaces (or Javadocs); I
> >> just
> >>>>>>>>> needed
> >>>>>>>>>>> a way to refer to the concepts in the KIP. Hopefully this is a
> >>>>>>>> non-issue
> >>>>>>>>>>> (at least for now) as a result. Do you have a suggestion for
> >>>>>>>> terminology
> >>>>>>>>>>> that would've been less confusing?
> >>>>>>>>>>>
> >>>>>>>>>>>> 4) Even I think delete api should be supported.
> >>>>>>>>>>>
> >>>>>>>>>>> Makes sense. It'd be to get your input on the same follow-up
> >>>>>>>> questions I
> >>>>>>>>>>> asked Matthias above as well :)
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Nov 22, 2022 at 4:25 AM Sagar <
> sagarmeansoc...@gmail.com
> >>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Victoria,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP. Seems like a very interesting idea!
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have a couple of questions:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) Did you consider adding a method similar to :
> >>>>>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)?
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think this could be useful considering that this
> >>>>>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT?
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2) I have a similar question as Matthias, about the
> timestampTo
> >>>>>>>>> argument
> >>>>>>>>>>>> when doing a get. Is it inclusive or exclusive?
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is
> essentially
> >>>> the
> >>>>>>>>>>>> timestamp at which the record was inserted. validFrom makes it
> >>>> sound
> >>>>>>>>> like
> >>>>>>>>>>>> validTo which can keep changing based on new records while
> >> *from*
> >>>> is
> >>>>>>>>>>> fixed.
> >>>>>>>>>>>> WDYT?
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4) Even I think delete api should be supported.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks!
> >>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax <
> >> mj...@apache.org
> >>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP Victoria. Very well written!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Couple of questions (many might just require to add some more
> >>>>>>>> details
> >>>>>>>>>>> to
> >>>>>>>>>>>>> the KIP):
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (1) Why does the new store not extend KeyValueStore,
> but
> >>>>>>>>> StateStore?
> >>>>>>>>>>>>> In the end, it's a KeyValueStore?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (2) Should we have a ReadOnlyVersionedKeyValueStore?
> Even
> >>>> if we
> >>>>>>>>> don't
> >>>>>>>>>>>>> want to support IQ in this KIP, it might be good to add this
> >>>>>>>> interface
> >>>>>>>>>>>>> right away to avoid complications for follow up KIPs? Or
> won't
> >>>>>> there
> >>>>>>>>> by
> >>>>>>>>>>>>> any complications anyway?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (3) Why do we not have a `delete(key)` method? I am ok
> >> with
> >>>> not
> >>>>>>>>>>>>> supporting all methods from existing KV-store, but a
> >>>> `delete(key)`
> >>>>>>>>>>> seems
> >>>>>>>>>>>>> to be fundamentally to have?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (4a) Do we need `get(key)`? It seems to be the same as
> >>>>>> `get(key,
> >>>>>>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though?
> >>>> Just
> >>>>>>>> for
> >>>>>>>>>>>>> my own clarification (should we add something to the
> >> JavaDocs?).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (4b) Should we throw an exception if a user queries
> >>>>>> out-of-bound
> >>>>>>>>>>>>> instead of returning `null` (in `get(key,ts)`)?
> >>>>>>>>>>>>>         -> You put it into "rejected alternatives", and I
> >>>> understand
> >>>>>>>> your
> >>>>>>>>>>>>> argument. Would love to get input from others about this
> >> question
> >>>>>>>>>>>>> though. -- It seems we also return `null` for windowed
> stores,
> >> so
> >>>>>>>>> maybe
> >>>>>>>>>>>>> the strongest argument is to align to existing behavior? Or
> do
> >> we
> >>>>>>>> have
> >>>>>>>>>>>>> case for which the current behavior is problematic?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (4c) JavaDoc on `get(key,ts)` says: "(up to store
> >>>>>> implementation
> >>>>>>>>>>>>> discretion when this is the case)" -> Should we make it a
> >>>> stricter
> >>>>>>>>>>>>> contract such that the user can reason about it better (there
> >> is
> >>>>>> WIP
> >>>>>>>>> to
> >>>>>>>>>>>>> make retention time a strict bound for windowed stores atm)
> >>>>>>>>>>>>>         -> JavaDocs on `persistentVersionedKeyValueStore`
> seems
> >> to
> >>>>>>>>> suggest a
> >>>>>>>>>>>>> strict bound, too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (5a) Do we need to expose `segmentInterval`? For
> >>>>>> windowed-stores,
> >>>>>>>>> we
> >>>>>>>>>>>>> also use segments but hard-code it to two (it was exposed in
> >>>>>> earlier
> >>>>>>>>>>>>> versions but it seems not useful, even if we would be open to
> >>>>>> expose
> >>>>>>>>> it
> >>>>>>>>>>>>> again if there is user demand).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (5b) JavaDocs says: "Performance degrades as more
> record
> >>>>>> versions
> >>>>>>>>> for
> >>>>>>>>>>>>> the same key are collected in a single segment. On the other
> >>>> hand,
> >>>>>>>>>>>>> out-of-order writes and reads which access older segments may
> >>>> slow
> >>>>>>>>> down
> >>>>>>>>>>>>> if there are too many segments." -- Wondering if JavaDocs
> >> should
> >>>>>>>> make
> >>>>>>>>>>>>> any statements about expected performance? Seems to be an
> >>>>>>>>>>> implementation
> >>>>>>>>>>>>> detail?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (6) validTo timestamp is "exclusive", right? Ie, if I
> >> query
> >>>>>>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next"
> >> record
> >>>>>> v2
> >>>>>>>>>>>>> with validFromV2=ts?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (7) The KIP says, that segments are stores in the same
> >>>> RocksDB
> >>>>>> --
> >>>>>>>>> for
> >>>>>>>>>>>>> this case, how are efficient deletes handled? For
> >> windowed-store,
> >>>>>> we
> >>>>>>>>>>> can
> >>>>>>>>>>>>> just delete a full RocksDB.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        (8) Rejected alternatives: you propose to not return
> the
> >>>>>> validTo
> >>>>>>>>>>>>> timestamp -- if we find it useful in the future to return it,
> >>>> would
> >>>>>>>>>>>>> there be a clean path to change it accordingly?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 11/16/22 9:57 PM, Victoria Xia wrote:
> >>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I have a proposal for introducing versioned state stores in
> >>>> Kafka
> >>>>>>>>>>>>> Streams.
> >>>>>>>>>>>>>> Versioned state stores are similar to key-value stores
> except
> >>>> they
> >>>>>>>>>>> can
> >>>>>>>>>>>>>> store multiple record versions for a single key. This KIP
> >>>> focuses
> >>>>>>>> on
> >>>>>>>>>>>>>> interfaces only in order to limit the scope of the KIP.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Victoria
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to