Thanks for bringing this up, Elias! FYI: A related idea we talked about is that of a timestamped state store (for the lack of a better description), where conceptually every entry would be a tuple of (key, value, timestamp). The use case was slightly different from what Elias describes: here, it was about using the timestamp to track when a entry was last updated, i.e. (key, value, last-updated-timestamp). The scenario was out-of-order records, where users would like to prevent late-arriving older records (with "outdated" record values) to overwrite the values of newer records that were already processed in the state store / KTable. Think: "For a new incoming record, update the value for the record's key in the state store if and only if the record's timestamp is >= the last-updated-timestamp. If you do update the value, then also replace the last-updated-timestamp with the record's timestamp."
My question is: how much of the TTL idea is about (1) a more granular, per-key expiration of state than what we currently provide, vs. (2) decision-making for whether or not another downstream update should be sent to a specific consumer ("we only wish to generate such an update if we last informed the client about the property during some time span"). From what I understand, Elias is arguing that he actually is interested in (2), but for that he needs (1) because the current compaction/expiration functionality is either too slow (we keep old keys around for too long; not bad but less efficient) or too fast (we'd lose knowledge about a key even though we still need it). But correct me if I am mistaken, Elias. :-) In other words, I'm trying to understand more clearly how TTL, handling/decision-making of updates, and state store expiration relate to each other here. On Wed, Feb 8, 2017 at 7:46 PM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > The use case is a simple one. You can think of it as an update mechanism. > One stream is a set of tuples consisting of consumer id, an object id, the > value of some property of the object, and a timestamp. This stream > represents a record of what we told some consumer the value of some > property of the object was at a point in time. The other stream is a set > of tuples consisting of object id, object property value, and timestamp. > This stream is just the change log of the value of the object property. > The job joins these streams to generate messages to clients to inform of > changes to an object property that they previously inquired about. > > The requirements for a TTL cache rather than a standard key-value store > comes from the fact that we only wish to generate such an update if we last > informed the client about the property during some time span. If there > were no TTL, the state would grow without bounds, as there is no signal to > remove an entry from the state. > > Note that this is not the same as a KStream-KStream windowed join. For the > first stream we are only interested in keeping track of an object's > property value last handed out to a client. For the second stream we are > only interested in keeping track of the latest value of an object's > property. It more closely resembles a KTable-KTable join, but one where the > entries of the table have a TTL and one table has a key that is a subset of > the other (object id vs the compound [object id, agent id]). > > My guess is that other have use cases that call for a store that expires > entries, as otherwise the store may grow unbounded as there aren't signals > other than time to trigger removal of entries from the store. > > As for the use of RocksDB TTL feature, after looking at it, I don't think > it will be a good fit. The TTL is computed from insertion time, not a > timestamp associated with a entry that can be passed in. That would be > problematic if a stream is delayed or you a reprocessing old data. > Therefore the segmented store seems like a better basis for an > implementation. > > > > On Wed, Feb 8, 2017 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Elias, > > > > I would love to solicit more feedbacks from the community on how commonly > > used a TTL persistent KV store. Maybe you can share your use cases first > > here in this thread? > > > > As for its implementation, I think leveraging rocksdb's TTL feature would > > be a good option. One tricky part though, is how we can insert the > > corresponding "tombstone" messages to the changelogs as well in an > > efficient way (this is also the main reason we did not add this feature > in > > the first release of Kafka Streams). I remember rocksdb's TTL feature do > > have a compaction listener interface, but not sure if it is available in > > JNI. That may worth exploring. > > > > > > Guozhang > > > > > > On Mon, Feb 6, 2017 at 8:17 PM, Elias Levy <fearsome.lucid...@gmail.com> > > wrote: > > > > > We have a use case within a Streams application that requires a > > persistent > > > TTL key-value cache store. As Streams does not currently offer such a > > > store, I've implemented it by abusing WindowStore, as it allows for a > > > configurable retention period. Nonetheless, it is not an ideal > solution, > > > as you can only iterate forward on the iterator returned by fetch(), > > > whereas the use case calls for a reverse iterator, as we are only > > > interested in the latest value for an entry. > > > > > > I am curious as to the appetite for a KIP to add such a TTL caching > > store. > > > KAFKA-4212 is the issue I opened requesting such a store. Do others > > have a > > > need for them? If there is interest in such a KIP, I can get one > > started. > > > > > > If there is interest, there are two ways such a store could be > > > implemented. It could make use of RocksDB TTL feature or it could > mirror > > > WindowStore and make use multiple segmented RockDBs, possibly reusing > the > > > RocksDBSegmentedBytesStore from the latest refactoring of the stores. > > The > > > former deletes most of the work to RocksDB compaction, although likely > at > > > the expense of greater write amplification. The later is more > efficient > > at > > > dropping expired entries, but potentially more space inefficient. > > > > > > Thoughts? > > > > > > > > > > > -- > > -- Guozhang > > >