I agree, sole point summarizes to very basic point that KStreams is pure Java, 
so it can call a JNI bridge to access native RocksDB  compaction filter and 
shiping the native JNI code inside streams would be possible but very tricky 
and will surely add cross platform build complexity, extra maintenance, and 
risks JVM crashes if there are bugs.

Best,
Ankur
________________________________
From: Nick Telford <[email protected]>
Sent: Tuesday, October 7, 2025 4:22 PM
To: [email protected] <[email protected]>
Subject: Re: [EXT] Re: [DISCUSS] KIP-1225: Add Optional TTL Support to Kafka 
Streams State Stores

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

On reflection, I actually really like Lucas' suggestion of using the
RocksDB CompactionFilter to discard timed-out records. The downside of this
approach is that, at the moment, RocksJNI doesn't support Java
CompactionFilters.

The advantage of this approach is that the CompactionFilter can be made
aware of the ValueAndTimestamp format, re-using the timestamps already
supplied by Kafka, instead of storing additional timestamps.

This is preferable to having a separate "index", because it avoids the
write and space amplification issues that maintaining an index has,
especially for stores with a very large number of entries and/or high
throughput of writes.

Cheers,
Nick

This email was screened for spam and malicious content but exercise caution 
anyway.



On Tue, 7 Oct 2025 at 11:09, Ankur Sinha <[email protected]> wrote:

> Hi everyone,
>
> I’ve been exploring an Indexed Expiration approach that could be more
> efficient for large state stores. Let's say to maintain an internal
> timestamp index that KStreams manages automatically, instead of relying on
> a periodic full-store scan.
>
> Whenever a record is written to a TimestampedKeyValueStore, KStreams would:
> Write (key → ValueAndTimestamp) to the main store, just like today.
> Internally also write (timestamp, key) into a secondary index store.
> During cleanup, instead of scanning the full keyspace, the punctuator
> would just look up expired timestamps in the index and remove the
> corresponding keys from the main store.
> So the flow becomes someting like this:
>
> User → TimestampedKeyValueStore
> ↳ Main Store: key → ValueAndTimestamp
> ↳ Internal Index: (timestmp, key)
> Eviction → Scan index for expired timestmps → Delete from main store +
> remove index entries
>
>
> This makes cleanup far more efficient as we only touch the subset of keys
> that are actually expired, not the whole store. YThere’s no issue for
> partitioning because both stores (main and index) awould be local per tasks.
> Each task owns its own timestamp index alongside its main store, so
> expiration happens per-partition, fully aligned with how KStreams already
> handle changelog restoration.
> In other words, both the main and index store share the same changelog
> topic and partition mapping, so during recovery , they move together and
> stay consistent. The index key is basically (timestamp, key) — timestamp as
> the primary sort of key, and the original key to ensure uniqueness.
> That makes it efficient to scan “everything older than now minus TTL”
> using RocksDB’s natural key ordering.
>
> Thoughts ??
> Best,
> Ankur
> ________________________________
> From: Nick Telford <[email protected]>
> Sent: Tuesday, October 7, 2025 3:17 PM
> To: [email protected] <[email protected]>
> Subject: [EXT] Re: [DISCUSS] KIP-1225: Add Optional TTL Support to Kafka
> Streams State Stores
>
> Warning External sender Do not click on any links or open any attachments
> unless you trust the sender and know the content is safe.
>
> Hi everyone,
>
> I agree with Lucas that we ideally want a solution that scales to larger
> stores.
>
> My team actually use a custom RocksDB-based StateStore that implements a
> TTL using a combination of Kafka's topic retention time and RocksDB's
> TtlDB.
>
> It's not perfect, because TtlDB is based on insertion time, rather than
> record timestamp; but for our purposes it's acceptable.
>
> IMO, the ideal solution would be:
>
> 1. The ability to specify a custom timestamp when writing to RocksDB's
> TtlDB via WriteOptions. This functionality does not yet exist, so would
> need to be contributed upstream.
> 2. To sync the retention in both the changelog topic and RocksDB, but with
> a configurable margin of error (to compensate for imperfectly synchronised
> system clocks). This would be added to the changelog retention time, such
> that the changelog is retained for (slightly) longer than the local store
> retention.
> 3. When admitting records from the changelog into the store, any records
> with a timestamp older than the store retention are then simply discarded.
>
> This ensures that the local store's retention dictates the availability of
> records.
>
> We would also, ideally, have an implementation for InMemoryKeyValueStore
> that can efficiently evict old entries.
>
> Regarding the KIP: I think it would be good to add methods to the
> Materialized interface, to enable the DSL to define a TTL without having to
> supply a full StoreBuilder.
>
> Regards,
>
> Nick
>
> On Tue, 7 Oct 2025 at 08:51, Lucas Brutschy <[email protected]
> .invalid>
> wrote:
>
> > Hi Ankur,
> >
> > Thanks for the KIP! A built-in approach for TTLs in state store is an
> > important extension of Kafka Streams. Indeed, it does not seem ideal
> > that people must implement this manually by writing clean-up
> > tombstones from a punctuation, particularly because it requires
> > scanning the entire state store and will not scale to larger state
> > stores.
> >
> > I like how your KIP makes it extremely simple to set up a TTL by just
> > calling withTtl on the state store. However, the approach to expiring
> > records is still based on periodic scans of the entire state store.
> > Additionally, the punctuation interval appears to be hardcoded to one
> > minute.
> >
> > I was wondering if you have explored alternative implementations to
> > expire keys in a way that scales to larger stores as well? One idea is
> > to use RocksDB's "best effort" Compaction Filter and retention on the
> > changelog topic to reclaim space based on the TTL. Then, ensure a
> > consistent view of the local state store and a state store restored
> > from the changelog topic by precisely enforcing the TTL when the state
> > store is accessed. Another idea is to change the underlying RocksDB
> > representation to perform periodic expiration more efficiently, for
> > example, by having an index by timestamp and expiring keys using a
> > prefix scan on that index.
> >
> > These are vague ideas that require detailed validation. But I am not
> > sure that we should introduce a TTL expiration mechanism that doesn't
> > scale to larger state stores before exploring these options.
> >
> > WDYT?
> >
> > Cheers,
> > Lucas
> >
> > On Tue, Oct 7, 2025 at 7:26 AM Ankur Sinha <[email protected]>
> > wrote:
> > >
> > > Hello,
> > >
> > > I’d like to start the discussion 🙂 for KIP-1225: Add Optional TTL
> > Support to Kafka Streams State Stores<
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores>
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores>
> >
> > >
> > > JIRA: 
> > > https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759><
> https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759>><
> > https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759><
> https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759>>>
> > > KIP Wiki:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores>
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1225%3A+Add+Optional+TTL+Support+to+Kafka+Streams+State+Stores>
> >
> > >
> > > Regards,
> > > Ankur Sinha
> > >
> > > ________________________________
> > > From: Matthias J. Sax <[email protected]>
> > > Sent: Tuesday, October 7, 2025 1:04 AM
> > > To: [email protected] <[email protected]>
> > > Subject: [EXT] Re: [DISCUSS] KIP-19759: Add built-in TTL (Time-to-Live)
> > support for Kafka Streams State Stores
> > >
> > > Warning External sender Do not click on any links or open any
> > attachments unless you trust the sender and know the content is safe.
> > >
> > > If you want to start a KIP, you need to write one :) A Jira ticket is
> > > not a KIP
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals>
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals>
> >
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals>
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals>
> >
> > >
> > >
> > >
> > > -Matthias
> > >
> > > This email was screened for spam and malicious content but exercise
> > caution anyway.
> > >
> > >
> > >
> > > On 10/6/25 10:26 AM, Ankur Sinha wrote:
> > > > Hi all,
> > > >
> > > > I’d like to start the discussion for KAFKA-19759: Add built-in TTL
> > (Time-to-Live) support for Kafka Streams State Stores.
> > > >
> > > > JIRA: 
> > > > https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759><
> https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759>><
> > https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759><
> https://issues.apache.org/jira/browse/KAFKA-19759<https://issues.apache.org/jira/browse/KAFKA-19759>>>
> > > > KIP Wiki: (will be created shortly)
> > > >
> > > > Streams users often need per-key TTL functionality (e.g., for
> > cache-like state, per-key deduplication, or automatic cleanup).
> > > > Currently, this requires manual punctuators and tombstones.
> > > >
> > > > This KIP proposes adding a built-in `withTtl(Duration ttl)` option to
> > state stores,
> > > > which would handle key expiration and changelog consistency
> > automatically.
> > > >
> > > > Please see the JIRA for full details and general analysis.
> > > > Looking forward for a discussion and talk.
> > > >
> > > > Best regards,
> > > > Ankur Sinha
> > > >
> >
>
> This email was screened for spam and malicious content but exercise
> caution anyway.
>
>
>

Reply via email to