Thanks for bringing this up, Elias!

FYI: A related idea we talked about is that of a timestamped state store
(for the lack of a better description), where conceptually every entry
would be a tuple of (key, value, timestamp).  The use case was slightly
different from what Elias describes: here, it was about using the timestamp
to track when a entry was last updated, i.e. (key, value,
last-updated-timestamp).  The scenario was out-of-order records, where
users would like to prevent late-arriving older records (with "outdated"
record values) to overwrite the values of newer records that were already
processed in the state store / KTable.  Think: "For a new incoming record,
update the value for the record's key in the state store if and only if the
record's timestamp is >= the last-updated-timestamp.  If you do update the
value, then also replace the last-updated-timestamp with the record's
timestamp."

My question is: how much of the TTL idea is about (1) a more granular,
per-key expiration of state than what we currently provide, vs. (2)
decision-making for whether or not another downstream update should be sent
to a specific consumer ("we only wish to generate such an update if we last
informed the client about the property during some time span").  From what
I understand, Elias is arguing that he actually is interested in (2), but
for that he needs (1) because the current compaction/expiration
functionality is either too slow (we keep old keys around for too long; not
bad but less efficient) or too fast (we'd lose knowledge about a key even
though we still need it).  But correct me if I am mistaken, Elias. :-)

In other words, I'm trying to understand more clearly how TTL,
handling/decision-making of updates, and state store expiration relate to
each other here.



On Wed, Feb 8, 2017 at 7:46 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> The use case is a simple one.  You can think of it as an update mechanism.
> One stream is a set of tuples consisting of consumer id, an object id, the
> value of some property of the object, and a timestamp.  This stream
> represents a record of what we told some consumer the value of some
> property of the object was at a point in time.  The other stream is a set
> of tuples consisting of object id, object property value, and timestamp.
> This stream is just the change log of the value of the object property.
> The job joins these streams to generate messages to clients to inform of
> changes to an object property that they previously inquired about.
>
> The requirements for a TTL cache rather than a standard key-value store
> comes from the fact that we only wish to generate such an update if we last
> informed the client about the property during some time span.  If there
> were no TTL, the state would grow without bounds, as there is no signal to
> remove an entry from the state.
>
> Note that this is not the same as a KStream-KStream windowed join.  For the
> first stream we are only interested in keeping track of an object's
> property value last handed out to a client.  For the second stream we are
> only interested in keeping track of the latest value of an object's
> property. It more closely resembles a KTable-KTable join, but one where the
> entries of the table have a TTL and one table has a key that is a subset of
> the other (object id vs the compound [object id, agent id]).
>
> My guess is that other have use cases that call for a store that expires
> entries, as otherwise the store may grow unbounded as there aren't signals
> other than time to trigger removal of entries from the store.
>
> As for the use of RocksDB TTL feature, after looking at it, I don't think
> it will be a good fit.  The TTL is computed from insertion time, not a
> timestamp associated with a entry that can be passed in.  That would be
> problematic if a stream is delayed or you a reprocessing old data.
> Therefore the segmented store seems like a better basis for an
> implementation.
>
>
>
> On Wed, Feb 8, 2017 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Elias,
> >
> > I would love to solicit more feedbacks from the community on how commonly
> > used a TTL persistent KV store. Maybe you can share your use cases first
> > here in this thread?
> >
> > As for its implementation, I think leveraging rocksdb's TTL feature would
> > be a good option. One tricky part though, is how we can insert the
> > corresponding "tombstone" messages to the changelogs as well in an
> > efficient way (this is also the main reason we did not add this feature
> in
> > the first release of Kafka Streams). I remember rocksdb's TTL feature do
> > have a compaction listener interface, but not sure if it is available in
> > JNI. That may worth exploring.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Feb 6, 2017 at 8:17 PM, Elias Levy <fearsome.lucid...@gmail.com>
> > wrote:
> >
> > > We have a use case within a Streams application that requires a
> > persistent
> > > TTL key-value cache store.  As Streams does not currently offer such a
> > > store, I've implemented it by abusing WindowStore, as it allows for a
> > > configurable retention period.  Nonetheless, it is not an ideal
> solution,
> > > as you can only iterate forward on the iterator returned by fetch(),
> > > whereas the use case calls for a reverse iterator, as we are only
> > > interested in the latest value for an entry.
> > >
> > > I am curious as to the appetite for a KIP to add such a TTL caching
> > store.
> > > KAFKA-4212 is the issue I opened requesting such a store.  Do others
> > have a
> > > need for them?  If there is interest in such a KIP, I can get one
> > started.
> > >
> > > If there is interest, there are two ways such a store could be
> > > implemented.  It could make use of RocksDB TTL feature or it could
> mirror
> > > WindowStore and make use multiple segmented RockDBs, possibly reusing
> the
> > > RocksDBSegmentedBytesStore from the latest refactoring of the stores.
> > The
> > > former deletes most of the work to RocksDB compaction, although likely
> at
> > > the expense of greater write amplification.  The later is more
> efficient
> > at
> > > dropping expired entries, but potentially more space inefficient.
> > >
> > > Thoughts?
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to