The use case is a simple one. You can think of it as an update mechanism. One stream is a set of tuples consisting of consumer id, an object id, the value of some property of the object, and a timestamp. This stream represents a record of what we told some consumer the value of some property of the object was at a point in time. The other stream is a set of tuples consisting of object id, object property value, and timestamp. This stream is just the change log of the value of the object property. The job joins these streams to generate messages to clients to inform of changes to an object property that they previously inquired about.
The requirements for a TTL cache rather than a standard key-value store comes from the fact that we only wish to generate such an update if we last informed the client about the property during some time span. If there were no TTL, the state would grow without bounds, as there is no signal to remove an entry from the state. Note that this is not the same as a KStream-KStream windowed join. For the first stream we are only interested in keeping track of an object's property value last handed out to a client. For the second stream we are only interested in keeping track of the latest value of an object's property. It more closely resembles a KTable-KTable join, but one where the entries of the table have a TTL and one table has a key that is a subset of the other (object id vs the compound [object id, agent id]). My guess is that other have use cases that call for a store that expires entries, as otherwise the store may grow unbounded as there aren't signals other than time to trigger removal of entries from the store. As for the use of RocksDB TTL feature, after looking at it, I don't think it will be a good fit. The TTL is computed from insertion time, not a timestamp associated with a entry that can be passed in. That would be problematic if a stream is delayed or you a reprocessing old data. Therefore the segmented store seems like a better basis for an implementation. On Wed, Feb 8, 2017 at 9:53 AM, Guozhang Wang <[email protected]> wrote: > Hello Elias, > > I would love to solicit more feedbacks from the community on how commonly > used a TTL persistent KV store. Maybe you can share your use cases first > here in this thread? > > As for its implementation, I think leveraging rocksdb's TTL feature would > be a good option. One tricky part though, is how we can insert the > corresponding "tombstone" messages to the changelogs as well in an > efficient way (this is also the main reason we did not add this feature in > the first release of Kafka Streams). I remember rocksdb's TTL feature do > have a compaction listener interface, but not sure if it is available in > JNI. That may worth exploring. > > > Guozhang > > > On Mon, Feb 6, 2017 at 8:17 PM, Elias Levy <[email protected]> > wrote: > > > We have a use case within a Streams application that requires a > persistent > > TTL key-value cache store. As Streams does not currently offer such a > > store, I've implemented it by abusing WindowStore, as it allows for a > > configurable retention period. Nonetheless, it is not an ideal solution, > > as you can only iterate forward on the iterator returned by fetch(), > > whereas the use case calls for a reverse iterator, as we are only > > interested in the latest value for an entry. > > > > I am curious as to the appetite for a KIP to add such a TTL caching > store. > > KAFKA-4212 is the issue I opened requesting such a store. Do others > have a > > need for them? If there is interest in such a KIP, I can get one > started. > > > > If there is interest, there are two ways such a store could be > > implemented. It could make use of RocksDB TTL feature or it could mirror > > WindowStore and make use multiple segmented RockDBs, possibly reusing the > > RocksDBSegmentedBytesStore from the latest refactoring of the stores. > The > > former deletes most of the work to RocksDB compaction, although likely at > > the expense of greater write amplification. The later is more efficient > at > > dropping expired entries, but potentially more space inefficient. > > > > Thoughts? > > > > > > -- > -- Guozhang >
