Hi Sagar,

Thanks for the response. Your use case makes sense to me; I figured it must be 
something like that. 

On a pragmatic level, in the near term, you might consider basically doing the 
same thing we did in KIP-213. If you swap out the store types for Byte/byte[] 
and “manually” invoke the serdes in your own logic, then you can use the same 
algorithm we did to derive the range scan boundaries from your desired prefix.

For the actual KIP, it seems like we would need significant design improvements 
to be able to do any mixins, so I think we should favor proposing either to 
just add to the existing interfaces or to create brand new interfaces, as 
appropriate, for now. Given that prefix can be converted to a range query at a 
low level, I think we can probably explore adding prefix to the existing 
interfaces with a default implementation. 

It seems like that just leaves the question of how to define the type of the 
prefix. To be honest, I don’t have any great ideas here. Are you able to 
generate some creative solutions, Sagar?

Thanks,
John

On Tue, May 26, 2020, at 06:42, Sagar wrote:
> Hi John,
> 
> Thanks for the detailed reply. I was a bit crammed with work last week so
> couldn't respond earlier so apologies for that.
> 
> First of all, thanks for the context that both you and Adam have
> provided me on the issues faced previously. As I can clearly see, while I
> was able to cut some corners while writing some test cases or benchmarks,
> to be able to stitch together a store with prefix scan into an actual
> topology needs more work. I am sorry for the half baked tests that I wrote
> without realising and you have rightly put it when you said these
> challenges aren't obvious up front.
> 
> Now, coming back to the other points, I spent some time going through the
> KIP-213 and also some of the code snippets that are talked about in that
> KIP. With the detailed explanation that you provided, it is now obvious to
> me that keeping a generic type for keys like K won't work oob and hence a
> decision was made to use Bytes as the key type.
> 
> I just had another thought on this though. I looked at the range function
> that was added in the ReadOnlyKeyValueStore. While the Key and the Value
> mentioned in that method is generic, internally almost all queries end up
> querying using Bytes in some or the other form. I looked at not just
> RocksDb Store but other stores like InMemory store or MemoryLRU and this
> seems to be the pattern. I think this stems from the fact that these stores
> while implementing KeyValueStore pass Bytes, byte[] as the K and V values.
> Classes like MeteredKeyValueStore which don't do this, still use Bytes.wrap
> to wrap the passed keys and values and invoke the range method.
> 
> So, the point I am trying to make is, with the same behaviour - and
> ignoring for a moment that it's a separate interface which I am trying to
> "mix-in"- the issues with the key types could be resolved. I may be wrong
> though so would like to know your thoughts on this. Infact unknowingly the
> interface implementation of PrefixSeekableType in RockDBStateStore also
> passes Bytes and bytes[] as K and V.
> 
> The second part of exposing it via the publically accessible interfaces to
> which we downcast while building the topology (like KeyValueStore), I can
> clearly see now that mixing-in the way I tried to won't work. My intention
> all along was not to hamper the flow of those stores which don't support
> prefix scan as yet and hence the separate interface. But, I agree that for
> this to work, it needs to be part of some pre-defined store types like
> KVStore etc. Right now, I don't have an answer to this but mostly it would
> have to be moved there and implemented across all stores(if we see the
> worth in prefix scans :) )
> 
> Regarding the motivation, I am sorry if I wasn't clear. This originated
> from one of my own use cases with kafka streams where i needed to find some
> keys based upon certain prefix. Infact it's similar to the
> RangeScanCombinedKeyUsage diagram in KIP-213 where the otherTable tries to
> find entries in the state store based upon the FK. I was using
> KevValueStore to be precise. I also remember having a slack conversation on
> this, and I was told that this isn't supported right now, but some other
> users shared their experiences on how with some hacks they are able to
> perform prefix scans even though their use case fits the bill for a prefix
> scan. That kind of motivated me to take a stab at it. Unfortunately, I have
> lost the slack chat because of some cleanup at the slack channel level. I
> will try and update the ambiguous motivation statement in the near future.
> 
> Lastly, I would like to point out, that your response was not at all
> discouraging. On the contrary it was really insightful and it's always good
> to learn/discover new things :)
> 
> Thanks!
> Sagar.
> 
> On Fri, May 15, 2020 at 7:37 AM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hi, Sagar!
> >
> > Thanks for this KIP. I'm sorry it took me so long to reply. I'll number my
> > points differently to avoid confusion.
> >
> > I can provide some additional context on the difficulties we previously
> > faced in KIP-213 (which you and Adam have already discussed).
> >
> > J1) In your KIP, you propose the following interface:
> >
> > public interface PrefixSeekableStore<K, V> {
> >     KeyValueIterator<K, V> prefixSeek(K prefix);
> > }
> >
> > This is roughly the same thing that Adam and I were considering
> > before. It has a hidden problem, that it assumes that prefixes of
> > keys in the key space are also in the key space. In other words, this
> > is a store with key type K, and the API assumes that prefixes are also
> > of type K. This is true for some key types, like String or Bytes, but not
> > for others.
> >
> > For example, if the keys are UUIDs, then no prefix is also a UUID. If the
> > key is a complex data type, like Windowed<K> in our own DSL, then
> > we would absolutely want to query all keys with the same record key
> > (the K part), or the same window start time, but in neither case is the
> > prefix actually a Windowed<K>.
> >
> > You can skirt the issue by defining a third type parameter, maybe KP, that
> > is the "prefix" type, but this would also be awkward for many usages.
> >
> > J2) There is a related problem with serialization. Whether something
> > is a prefix or not depends not on the Java key (K), but on the binary
> > format that is produced when you use a serde on the key. Whether
> > we say that the prefix must also be a K or whether it gets its own type,
> > KP, there are problems.
> >
> > In the latter case, we must additionally require a second set of serdes
> > for the prefixes, but there's no obvious way to incorporate this in the
> > API, especially not in the DSL.
> >
> > In either case, for the API to actually work, we need to know ahead
> > of time that the Serde will produce a binary key that starts with the
> > part that we wish to use as a prefix. For example, what we were doing
> > briefly in KIP-213 (where we had complex keys, similar to Windowed<K>)
> > was to define "dummy" values that indicate that a Windowed<K> is actually
> > just a prefix key, not a real key. Maybe the window start time would be
> > null or the key part would be null. But we also had to define a serde
> > that would very specifically anticipate which component of the complex
> > key would need to be used in a prefix key. Having to bring all these
> > parts together in a reliable, easy-to-debug, fashion gives me some doubt
> > that people would actually be able to use this feature in complicated
> > programs without driving themselves crazy.
> >
> > J3) Thanks so much for including benchmarks and tests! Unfortunately,
> > these don't include everything you need to really plug into the Streams
> > API. I think when you push it a little farther, you'll realize what Adam
> > was talking about wrt the interface difficulties.
> >
> > In your benchmark and tests, you directly construct the store and then
> > use it, but in a real Streams application, you can only provide your
> > implementation in a StoreSupplier, for example via the Materialized
> > parameter. Then, to use the store from inside a Processor, you'd have
> > to get it by name from the ProcessorContext, and then cast it to one of
> > the pre-defined store types, KeyValueStore, WindowedStore, or
> > SessionStore. It won't work to "mix in" your interface because the
> > processor gets a store that's wrapped in layers that handle serialization,
> > change-logging, recording metrics, and caching.
> >
> > To use the store through IQ, you have to provide a QueriableStoreType
> > to KafkaStreams#store, and you get back a similarly wrapped store.
> >
> > I think our only choices to add an interface like yours is either to add
> > it to one of the existing store types, like KeyValueStore or
> > WindowedStore, or to define a completely new store hierarchy, meaning
> > you have to duplicate all the "wrapper" layers in Streams.
> >
> > I think if you write an "end-to-end" test, where you write a Streams app,
> > provide your store, and then use it in a Processor and through IQ,
> > you'll see what I'm talking about.
> >
> > IIRC, those three points were the ones that ultimately led us to abandon
> > the whole idea last time and just register the stores with key type Bytes.
> > I think some creative solutions may yet be possible, but it'll take some
> > more design work to get there.
> >
> > Can I ask what your motivation is, exactly, for proposing this feature?
> > The motivation just says "some users may want to do it", which has
> > the advantage that it's impossible to disagree with, but doesn't provide
> > a lot of concrete detail ;)
> >
> > Specifically, what I'm wondering is whether you wanted to use this as
> > part of a KayValue store, which might be a challenge, or whether you
> > wanted to use it for more efficient scans in a WindowedStore, like
> > Guozhang.
> >
> > Thanks again for the KIP! I hope my response isn't too discouraging;
> > I just wanted to convey the challenges we faced last time, since they
> > are all not obvious up front.
> >
> > Best regards,
> > -John
> >
> >
> > On Thu, May 14, 2020, at 16:17, Sophie Blee-Goldman wrote:
> > > Whoops, I guess I didn't finish reading the KIP all the way to the end
> > > earlier. Thanks
> > > for including the link to the RocksDB PR in the KIP!
> > >
> > > I have one additional question about the proposal: do you plan to also
> > add
> > > this
> > > prefix seek API to the dual column family iterators? These are used by
> > > RocksDBTimestampedStore (which extends RocksDBStore), for example the
> > > *RocksDBDualCFRangeIterator*
> > >
> > > Thanks for the KIP!
> > >
> > > On Thu, May 14, 2020 at 10:50 AM Sagar <sagarmeansoc...@gmail.com>
> > wrote:
> > >
> > > > Hey @Adam,
> > > >
> > > > Thanks for sharing your experience with using prefix seek. I did look
> > at
> > > > your code for RocksDBPrefixIterator, infact I have repurposed that
> > class
> > > > itself since it wasn't being used else where. Regarding how I plan to
> > > > expose them through-out the state stores, what I have tried to do is
> > add it
> > > > as a separate interface. So, basically, it is not at the same level as
> > the
> > > > *range function so to speak. The reason I did that is currently I feel
> > not
> > > > all state stores are a natural fit for prefix seek. As I mentioned in
> > the
> > > > KIP as well, the current equivalent to it could be
> > BulkLoadingStore(not in
> > > > terms of functionality but in terms of how it is also not implemented
> > by
> > > > all of them). So, that ways I am not needing to stub them across all
> > the
> > > > state-stores and we can implement it only where needed. For example,
> > in the
> > > > PR that I have put for reference in the KIP, you can see that I have it
> > > > implemented only for RocksDB.
> > > >
> > > > @Guozhang,
> > > >
> > > > Thanks for the feedback. Those are very interesting questions and I
> > will
> > > > try my best to answer based upon whatever limited understanding I have
> > > > developed so far :)
> > > >
> > > > 1) Regarding the usage of useFixedLengthPrefixExtractor, honestly, I
> > hadn't
> > > > looked at that config. I did look it up after you pointed it out and
> > seems
> > > > it's more for hash-based memtables? I may be wrong though. But what I
> > would
> > > > say is that, the changes I had made were not exactly from a correctness
> > > > stand point but more from trying to showcase how we can implement these
> > > > changes. The idea was that once we see the merit in this approach then
> > we
> > > > can add some of the tunings( and I would need your team's assistance
> > there
> > > > :D).
> > > >
> > > > 2) Regarding the similarity of `RocksDBPrefixIterator` and
> > > > `RocksDBRangeIterator`, yes the implementations look more or less
> > similar.
> > > > So, in terms of performance, they might be similar. But semantically,
> > they
> > > > can solve 2 different use-cases. The range seek is useful when we know
> > both
> > > > from and to. But if we consider use-cases where we want to find keys
> > with a
> > > > certain prefix, but we don't know if what it's start and end is, then
> > > > prefix seek would come in more handy. The point that I am trying to
> > make is
> > > > that it can extend the scope of state stores from just point lookups to
> > > > somewhat being able to speculative queries where by users can search
> > if a
> > > > certain pattern exists. I can vouch for this personally because I
> > wanted to
> > > > use state stores for one such use case and since this option wasn't
> > there,
> > > > I had to do some other things. An equivalent to this could be SCAN
> > operator
> > > > in Redis. (Not trying to compare the Redis and state stores but trying
> > to
> > > > give some context).
> > > >
> > > > Regarding the point on bloom filter, I think there are certain
> > > > optimisations that are being talked about in case of prefix seek here:
> > > >
> > > >
> > > >
> > https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
> > > > Again
> > > > this isn't something that I have explored fully. Also, on the prefix
> > seek
> > > > page on RocksDB they mention that there's a prefix iterating technique
> > > > called Prefix Bloom Filter.
> > > >
> > > > 3) Regarding the question on length of bytes for seek v/s prefix seek,
> > I am
> > > > not entirely sure about that scenario. What I have understood is that
> > > > at-least for Rocks DB, it is more performant for short iterator queries
> > > > that longer ones.
> > > >
> > > > 4) Regarding the last question on placing it within Segment, the
> > reason I
> > > > didn't do that way, is that I thought we shouldn't tie this feature
> > only to
> > > > RocksDB. I agree that I got this idea while looking/reading about
> > RocksDB
> > > > but if we keep it outside the purview of RocksDB and keep it as a
> > pluggable
> > > > entity, then a) it remains generic by not being tied to any specific
> > store
> > > > and b) no change is needed at all for any of the other stores which
> > haven't
> > > > implemented it.
> > > >
> > > > I am not sure of any of the above points make sense but as I said,
> > this is
> > > > based out of my limited understanding of the codebase. So, pardon any
> > > > incorrect/illogical statements plz!
> > > >
> > > > @Sophie,
> > > >
> > > > Thanks for bringing that point up! I have mentioned about that PR in
> > the
> > > > KIP under a section called Other considerations. Nonetheless, thanks
> > for
> > > > pointing it out!
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > >
> > > > On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > > wrote:
> > > >
> > > > > Not to derail this KIP discussion, but to leave a few notes on some
> > of
> > > > the
> > > > > RocksDB points that have come up:
> > > > >
> > > > > Someone actually merged some long overdue performance improvements to
> > > > > the RocksJava implementation (the PR was opened back in 2017! yikes).
> > > > > I haven't looked into the prefix seek API closely enough to know how
> > > > > relevant
> > > > > this particular change is, and they are still improving things, but
> > it
> > > > > gives me some
> > > > > faith.
> > > > >
> > > > > There are some pretty promising results reported on the PR:
> > > > > https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
> > > > >
> > > > > Regarding the custom comparator, they also recently merged this
> > > > performance
> > > > > <https://github.com/facebook/rocksdb/pull/6252>
> > > > > improvement <https://github.com/facebook/rocksdb/pull/6252>. The
> > tl;dr
> > > > is
> > > > > they reduced the slowdown of a custom comparator in Java
> > > > > (relative to the native C++) from ~7x to ~5.2x at best. Which is
> > still
> > > > not
> > > > > great, but it
> > > > > would be interesting to run our own benchmarks and see how this
> > stacks
> > > > up.
> > > > >
> > > > > Of course, these are all new changes and as such will require us to
> > > > upgrade
> > > > > rocks to 6.x which means they have to wait for us to release a 3.0.
> > But
> > > > > there's
> > > > > some talk about 3.0 coming in the next few releases so consider it
> > food
> > > > for
> > > > > not-so-future thought
> > > > >
> > > > >
> > > > > On Tue, May 12, 2020 at 5:02 PM Adam Bellemare <
> > adam.bellem...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Guozhang
> > > > > >
> > > > > > For clarity, the issues I was running into was not about the actual
> > > > > > *prefixSeek* function itself, but about exposing it to the same
> > level
> > > > of
> > > > > > access as the *range* function throughout Kafka Streams. It
> > required a
> > > > > lot
> > > > > > of changes, and also required that most state stores stub it out
> > since
> > > > it
> > > > > > wasn't clear how they would implement it. It was basically an
> > > > > overreaching
> > > > > > API change that was easily solved (for the specific prefix-scan in
> > FKJ)
> > > > > by
> > > > > > simply using *range*. So to be clear, the blockers were
> > predominantly
> > > > > > around correctly handling the API changes, nothing to do with the
> > > > > > mechanisms of the RocksDB prefix scanning.
> > > > > >
> > > > > > As for KAFKA-5285 I'll look into it more to see if I can get a
> > better
> > > > > > handle on the problem!
> > > > > >
> > > > > > Hope this helps clear it up.
> > > > > >
> > > > > > Adam
> > > > > >
> > > > > >
> > > > > > On Tue, May 12, 2020 at 7:16 PM Guozhang Wang <wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello Adam,
> > > > > > >
> > > > > > > I'm wondering if you can provide a bit more context on the
> > blockers
> > > > of
> > > > > > > using prefixSeek of RocksDB (I saw you have a
> > RocksDBPrefixIterator
> > > > > class
> > > > > > > but not used anywhere yet)? I'm currently looking at ways to
> > allow
> > > > some
> > > > > > > secondary indices with rocksDB following some existing approaches
> > > > > > > from CockroachDB etc so I'm very curious to learn your
> > experience.
> > > > > > >
> > > > > > > 1) Before considering any secondary indices, a quick thought is
> > that
> > > > > for
> > > > > > > (key, timeFrom, timeTo) queries, we can easily replace the
> > current
> > > > > > > `range()` impl with a `prefixRange()` impl via a prefix iterator;
> > > > > though
> > > > > > > for (keyFrom, keyTo, timeFrom, timeTo) it is much more
> > complicated
> > > > > indeed
> > > > > > > and hence existing `range()` impl may still be used.
> > > > > > >
> > > > > > > 2) Another related issue I've been pondering for a while is
> > > > > > > around KAFKA-5285: with the default lexicograpic byte comparator,
> > > > since
> > > > > > the
> > > > > > > key length varies, the combo (key, window) would have
> > interleaving
> > > > byte
> > > > > > > layouts like:
> > > > > > >
> > > > > > > AAA0001          (key AAA, timestamp 0001)
> > > > > > > AAA00011        (key AAA0, timestamp 0011)
> > > > > > > AAA0002          (key AAA, timestamp 0002)
> > > > > > >
> > > > > > > which is challenging for prefix seeks to work efficiently.
> > Although
> > > > we
> > > > > > can
> > > > > > > overwrite the byte-comparator in JNI it is very expensive and the
> > > > cost
> > > > > of
> > > > > > > JNI overwhelms its benefits. If you've got some ideas around it
> > > > please
> > > > > > lmk
> > > > > > > as well.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare <
> > > > > adam.bellem...@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Sagar
> > > > > > > >
> > > > > > > > I implemented a very similar interface for KIP-213, the
> > foreign-key
> > > > > > > joiner.
> > > > > > > > We pulled it out of the final implementation and instead used
> > > > RocksDB
> > > > > > > range
> > > > > > > > instead. You can see the particular code where we use
> > > > > > RocksDB.range(...)
> > > > > > > to
> > > > > > > > get the same iterator result.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> > > > > > > >
> > > > > > > > We pulled it out because there were numerous awkward
> > acrobatics to
> > > > > > > > integrate *prefixSeek()* function into the Kafka Streams code.
> > > > > > > Basically, I
> > > > > > > > wanted to be able to access *prefixSeek()* the same way I can
> > > > access
> > > > > > > > *range()* for any state store, and in particular use it for
> > storing
> > > > > > data
> > > > > > > > with a particular foreign key (as per the previous URL).
> > However, I
> > > > > > found
> > > > > > > > out that it required way too many changes to expose the
> > > > > *prefixSeek()*
> > > > > > > > functionality while still being able to leverage all the nice
> > Kafka
> > > > > > > Streams
> > > > > > > > state management + supplier functionality, so we made a
> > decision
> > > > just
> > > > > > to
> > > > > > > > stick with *range()* and pull everything else out.
> > > > > > > >
> > > > > > > > I guess my question here is, how do you anticipate using
> > > > > *prefixSeek()*
> > > > > > > > within the framework of Kafka Streams, or the Processor API?
> > > > > > > >
> > > > > > > > Adam
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, May 12, 2020 at 2:52 AM Sagar <
> > sagarmeansoc...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I would like to start a discussion on the KIP that I created
> > > > below
> > > > > to
> > > > > > > add
> > > > > > > > > prefix scan support in State Stores:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Sagar.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to