Hi everyone,
I’ve been exploring an Indexed Expiration approach that could be more efficient
for large state stores. Let's say to maintain an internal timestamp index that
KStreams manages automatically, instead of relying on a periodic full-store
scan.
Whenever a record is written to a TimestampedKeyValueStore, KStreams would:
Write (key → ValueAndTimestamp) to the main store, just like today.
Internally also write (timestamp, key) into a secondary index store.
During cleanup, instead of scanning the full keyspace, the punctuator would
just look up expired timestamps in the index and remove the corresponding keys
from the main store.
So the flow becomes someting like this:
User → TimestampedKeyValueStore
↳ Main Store: key → ValueAndTimestamp
↳ Internal Index: (timestmp, key)
Eviction → Scan index for expired timestmps → Delete from main store + remove
index entries
This makes cleanup far more efficient as we only touch the subset of keys that
are actually expired, not the whole store. YThere’s no issue for partitioning
because both stores (main and index) awould be local per tasks.
Each task owns its own timestamp index alongside its main store, so expiration
happens per-partition, fully aligned with how KStreams already handle changelog
restoration.
In other words, both the main and index store share the same changelog topic
and partition mapping, so during recovery , they move together and stay
consistent. The index key is basically (timestamp, key) — timestamp as the
primary sort of key, and the original key to ensure uniqueness.
That makes it efficient to scan “everything older than now minus TTL” using
RocksDB’s natural key ordering.
Thoughts ??
Best,
Ankur
________________________________
From: Nick Telford <[email protected]>
Sent: Tuesday, October 7, 2025 3:17 PM
To: [email protected] <[email protected]>
Subject: [EXT] Re: [DISCUSS] KIP-1225: Add Optional TTL Support to Kafka
Streams State Stores
Warning External sender Do not click on any links or open any attachments
unless you trust the sender and know the content is safe.
Hi everyone,
I agree with Lucas that we ideally want a solution that scales to larger
stores.
My team actually use a custom RocksDB-based StateStore that implements a
TTL using a combination of Kafka's topic retention time and RocksDB's TtlDB.
It's not perfect, because TtlDB is based on insertion time, rather than
record timestamp; but for our purposes it's acceptable.
IMO, the ideal solution would be:
1. The ability to specify a custom timestamp when writing to RocksDB's
TtlDB via WriteOptions. This functionality does not yet exist, so would
need to be contributed upstream.
2. To sync the retention in both the changelog topic and RocksDB, but with
a configurable margin of error (to compensate for imperfectly synchronised
system clocks). This would be added to the changelog retention time, such
that the changelog is retained for (slightly) longer than the local store
retention.
3. When admitting records from the changelog into the store, any records
with a timestamp older than the store retention are then simply discarded.
This ensures that the local store's retention dictates the availability of
records.
We would also, ideally, have an implementation for InMemoryKeyValueStore
that can efficiently evict old entries.
Regarding the KIP: I think it would be good to add methods to the
Materialized interface, to enable the DSL to define a TTL without having to
supply a full StoreBuilder.
Regards,
Nick
On Tue, 7 Oct 2025 at 08:51, Lucas Brutschy <[email protected]>
wrote:
> Hi Ankur,
>
> Thanks for the KIP! A built-in approach for TTLs in state store is an
> important extension of Kafka Streams. Indeed, it does not seem ideal
> that people must implement this manually by writing clean-up
> tombstones from a punctuation, particularly because it requires
> scanning the entire state store and will not scale to larger state
> stores.
>
> I like how your KIP makes it extremely simple to set up a TTL by just
> calling withTtl on the state store. However, the approach to expiring
> records is still based on periodic scans of the entire state store.
> Additionally, the punctuation interval appears to be hardcoded to one
> minute.
>
> I was wondering if you have explored alternative implementations to
> expire keys in a way that scales to larger stores as well? One idea is
> to use RocksDB's "best effort" Compaction Filter and retention on the
> changelog topic to reclaim space based on the TTL. Then, ensure a
> consistent view of the local state store and a state store restored
> from the changelog topic by precisely enforcing the TTL when the state
> store is accessed. Another idea is to change the underlying RocksDB
> representation to perform periodic expiration more efficiently, for
> example, by having an index by timestamp and expiring keys using a
> prefix scan on that index.
>
> These are vague ideas that require detailed validation. But I am not
> sure that we should introduce a TTL expiration mechanism that doesn't
> scale to larger state stores before exploring these options.
>
> WDYT?
>
> Cheers,
> Lucas
>
> On Tue, Oct 7, 2025 at 7:26 AM Ankur Sinha <[email protected]>
> wrote:
> >
> > Hello,
> >
> > I’d like to start the discussion 🙂 for KIP-1225: Add Optional TTL
> Support to Kafka Streams State Stores<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores>
> >
> > JIRA:
> > https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759><
> https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759>>
> > KIP Wiki:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores>
> >
> > Regards,
> > Ankur Sinha
> >
> > ________________________________
> > From: Matthias J. Sax <[email protected]>
> > Sent: Tuesday, October 7, 2025 1:04 AM
> > To: [email protected] <[email protected]>
> > Subject: [EXT] Re: [DISCUSS] KIP-19759: Add built-in TTL (Time-to-Live)
> support for Kafka Streams State Stores
> >
> > Warning External sender Do not click on any links or open any
> attachments unless you trust the sender and know the content is safe.
> >
> > If you want to start a KIP, you need to write one :) A Jira ticket is
> > not a KIP
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals>
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals>
> >
> >
> >
> > -Matthias
> >
> > This email was screened for spam and malicious content but exercise
> caution anyway.
> >
> >
> >
> > On 10/6/25 10:26 AM, Ankur Sinha wrote:
> > > Hi all,
> > >
> > > I’d like to start the discussion for KAFKA-19759: Add built-in TTL
> (Time-to-Live) support for Kafka Streams State Stores.
> > >
> > > JIRA:
> > > https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759><
> https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759>>
> > > KIP Wiki: (will be created shortly)
> > >
> > > Streams users often need per-key TTL functionality (e.g., for
> cache-like state, per-key deduplication, or automatic cleanup).
> > > Currently, this requires manual punctuators and tombstones.
> > >
> > > This KIP proposes adding a built-in `withTtl(Duration ttl)` option to
> state stores,
> > > which would handle key expiration and changelog consistency
> automatically.
> > >
> > > Please see the JIRA for full details and general analysis.
> > > Looking forward for a discussion and talk.
> > >
> > > Best regards,
> > > Ankur Sinha
> > >
>
This email was screened for spam and malicious content but exercise caution
anyway.