[
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524402#comment-15524402
]
Elias Levy commented on KAFKA-4212:
-----------------------------------
The general use case is the joining of updates to two tables over a limited
period of time.
Consider a hypothetical monitoring service that that allows clients to query
the status of nodes. The application may wish to inform the clients whenever
the status of a node that they have queried changes, but only if the client has
queried the status during the past 24 hours and if the last status for a node
is different from the last status the client received.
To do so the service can consume a stream of client node status queries with
their responses and node status updates. From the stream of client node status
queries the service would maintain a cache of the last status for a node sent
to to a client such that entries expire after 24 hours. From the node status
updates the service would maintain a mapping of node to latest status.
When a client query is received, the service can check on the node status
mapping to see if there is a newer status, and if there is, generate a
notification. When a node status update is received, the service can check the
last status sent to clients in the cache and generate a notification with the
new status to all clients that previously queried for a node's status.
As an optimization the mapping of nodes to latest status can also be a cache
with a TTL, since you don't need to keep the statuses of a nodes that haven't
changed in more than 24 hours, as you'll never receive a delayed node status
query to match it against.
Abstractly this is equivalent to a {{KTable}}-{{KTable}} inner join where
entries in each {{KTable}} expire after some TTL, and where one table has a
composite primary key (node id and client id on one {{KTable}} vs just node it
on the other).
It could also be though as a windowed {{KTable}} - {{KTable}} join (although in
such case records that fall outside the window would never be used and are just
wasting space), or a windowed {{KStream}}-{{KStream}} join of table updates
where only the latest updated values are used (i.e. discard updates in the
window if there is a newer update). Although, again, these would be joins
where the primary keys are not identical as one is a composite.
Alas, Kafka Streams does not support windowed {{KTable}}-{{KTable}} joins,
TTL'ed {{KeyValueStore}} s, or joins across {{KTable}} s and/or {{KStream}} s
with different keys.
That said, the above service can be implemented by joining the client status
query and client status updates streams using custom processors and by abusing
{{WindowStore}}. {{WindowStore}} can be used as a form of TTL'ed
{{KeyValueStore}}, as it will drop old values that fall out of its window, and
by iterating in reverse order and only using the latest value. And since it
allows you to store multiple values for the same key (node id), you can record
the node status you handed out to clients (node id key; client id, status, and
timestamp as value) and then iterate over all of them for a given node id
keeping only the latest one for each client id when a node status update comes
in an you perform the join.
> Add a key-value store that is a TTL persistent cache
> ----------------------------------------------------
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.1
> Reporter: Elias Levy
> Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some
> period of time. I.e. they need to maintain a TTL cache of values potentially
> larger than memory.
> Currently Kafka Streams provides non-windowed and windowed key-value stores.
> Neither is an exact fit to this use case.
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as
> required, but does not support expiration. The TTL option of RocksDB is
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment
> dropping, but it stores multiple items per key, based on their timestamp.
> But this store can be repurposed as a cache by fetching the items in reverse
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be
> useful to have an official and proper TTL cache API and implementation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)