Ankur Sinha created KAFKA-19759:
-----------------------------------
Summary: Add built-in TTL (Time-to-Live) support for Kafka Streams
State Stores
Key: KAFKA-19759
URL: https://issues.apache.org/jira/browse/KAFKA-19759
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: Ankur Sinha
In business cases Kafka Streams users frequently need *per-key Time-To-Live
(TTL)* behavior for state stores such as keyValueStore or kTables , typically
to model cache-like or deduplication scenarios.
Today, achieving this requires *manual handling* using:
* Custom timestamp tracking per key,
* Punctuators to periodically scan and remove expired entries, and
* Manual emission of tombstones to maintain changelog consistency.
These workarounds are:
* {*}Inconsistent across applications{*}, and
* {*}Operationally costly{*}, as each developer must reimplement the same
logic.
*Proposal* is to introduce a *built-in TTL mechanism* for Kafka Streams state
stores, allowing automatic expiration of records after a configured duration.
Introduction to new Api's like :
StoreBuilder<T> withTTL(Duration ttl);
Materialized<K, V, S> withTtl(Duration ttl);
When configured:
* Each record’s timestamp (from event-time or processing-time) is tracked.
* Expired keys are automatically evicted by a background task (via
ProcessorContext.Schedule()).
* Corresponding tombstones are flushed to changelog.
This feature can provide a *TTL abstraction* that simplifies common use cases
as:
* Maintaining cache-like state (e.g., last-seen values with limited lifespan)
* Automatically purging inactive or stale keys without manual cleanup.
Benifits it can bring :
* Consistency as automatic changelog tombstones will preserve correctness
across rebalances and restores.
* Will help to avoid boilerplate punctuator code for manual expiration.
* TTL is optional and opt-in; existing stores remain unaffected so backward
compatibility would be maintaoined.
Example to StateStore/ kTable inferface :
KTable<String, UserSession> sessions = builder
.table("sessions", Materialized.<String, UserSession, KeyValueStore<Bytes,
byte[]>>as("session-store")
.withTtl(Duration.ofHours(1))
.withValueSerde(userSessionSerde));
Here, session entries older than 1 hour will be automatically expired and
tombstoned from both the local RocksDB store and the changelog topic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)