This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 70a5dc151c9 KAFKA-20335: Document KIP-1271 headers-aware state stores 
(#21840)
70a5dc151c9 is described below

commit 70a5dc151c96ac5044b51d47790f3baad5e3ebe9
Author: nileshkumar3 <[email protected]>
AuthorDate: Tue May 26 19:22:35 2026 -0500

    KAFKA-20335: Document KIP-1271 headers-aware state stores (#21840)
    
    Updates docs for KIP-1271 for the new header state-stores.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
    <[email protected]>
---
 .../streams/developer-guide/interactive-queries.md | 36 ++++++++++++++--------
 docs/streams/developer-guide/processor-api.md      | 31 ++++++++++++++-----
 docs/streams/upgrade-guide.md                      | 18 +++++++++++
 3 files changed, 65 insertions(+), 20 deletions(-)

diff --git a/docs/streams/developer-guide/interactive-queries.md 
b/docs/streams/developer-guide/interactive-queries.md
index 48ec2af84e0..e1f8033f9e1 100644
--- a/docs/streams/developer-guide/interactive-queries.md
+++ b/docs/streams/developer-guide/interactive-queries.md
@@ -123,11 +123,11 @@ Supported
 Not supported (you must configure)
 </td> </tr> </table>
 
-# Querying local state stores for an app instance
+# Querying local state stores for an app instance 
{#querying-local-state-stores-for-an-app-instance}
 
 A Kafka Streams application typically runs on multiple instances. The state 
that is locally available on any given instance is only a subset of the 
[application's entire state](../architecture.html#streams-architecture-state). 
Querying the local stores on an instance will only return data locally 
available on that particular instance.
 
-The method `KafkaStreams#store(...)` finds an application instance's local 
state stores by name and type. Note that interactive queries are not supported 
for [versioned state 
stores](processor-api.html#streams-developer-guide-state-store-versioned) at 
this time.
+The method `KafkaStreams#store(...)` finds an application instance's local 
state stores by name and type. Note that interactive queries are not supported 
for [versioned state 
stores](/{version}/streams/developer-guide/processor-api/#versioned-key-value-state-stores)
 at this time.
 
 ![](/43/images/streams-interactive-queries-api-01.png)
 
@@ -135,12 +135,16 @@ Every application instance can directly query any of its 
local state stores.
 
 The _name_ of a state store is defined when you create the store. You can 
create the store explicitly by using the Processor API or implicitly by using 
stateful operations in the DSL.
 
-The _type_ of a state store is defined by `QueryableStoreType`. You can access 
the built-in types via the class `QueryableStoreTypes`. Kafka Streams currently 
has two built-in types:
-
-  * A key-value store `QueryableStoreTypes#keyValueStore()`, see Querying 
local key-value stores.
-  * A window store `QueryableStoreTypes#windowStore()`, see Querying local 
window stores.
-
+The _type_ of a state store is defined by `QueryableStoreType`. Pass a 
built-in implementation from 
[`QueryableStoreTypes`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.html)
 as the second argument to `KafkaStreams#store(...)`. The available built-in 
helpers are:
 
+  * **`QueryableStoreTypes#keyValueStore()`** — see [Querying local key-value 
stores](#querying-local-key-value-stores).
+  * **`QueryableStoreTypes#timestampedKeyValueStore()`** — see [Querying local 
key-value stores](#querying-local-key-value-stores).
+  * **`QueryableStoreTypes#timestampedKeyValueStoreWithHeaders()`** — see 
[Header-aware stores and interactive 
queries](#header-aware-stores-interactive-queries).
+  * **`QueryableStoreTypes#windowStore()`** — see [Querying local window 
stores](#querying-local-window-stores).
+  * **`QueryableStoreTypes#timestampedWindowStore()`** — see [Querying local 
window stores](#querying-local-window-stores).
+  * **`QueryableStoreTypes#timestampedWindowStoreWithHeaders()`** — see 
[Header-aware stores and interactive 
queries](#header-aware-stores-interactive-queries).
+  * **`QueryableStoreTypes#sessionStore()`** — see [Querying local window 
stores](#querying-local-window-stores).
+  * **`QueryableStoreTypes#sessionStoreWithHeaders()`** — see [Header-aware 
stores and interactive queries](#header-aware-stores-interactive-queries).
 
 You can also implement your own QueryableStoreType as described in section 
Querying local custom state stores.
 
@@ -148,10 +152,16 @@ You can also implement your own QueryableStoreType as 
described in section Query
 
 Kafka Streams materializes one state store per stream partition. This means 
your application will potentially manage many underlying state stores. The API 
enables you to query all of the underlying stores without having to know which 
partition the data is in.
 
+<a id="header-aware-stores-interactive-queries"></a>
+
+**Note:** For a [header-aware 
store](/{version}/streams/developer-guide/processor-api/#headers-in-state-stores),
 use the **`*WithHeaders()`** entry from the list above that corresponds to 
your store type when interactive query results must include record headers.
+
 ## Querying local key-value stores
 
-To query a local key-value store, you must first create a topology with a 
key-value store. This example creates a key-value store named 
"CountsKeyValueStore". This store will hold the latest count for any word that 
is found on the topic "word-count-input".
-    
+To query key-value state, you first build a topology that includes a state 
store. This example uses the DSL `count()` operator on a grouped stream, which 
creates a timestamped key-value store named `CountsKeyValueStore`. That store 
holds the latest count for each word from the topic `word-count-input`.
+
+Note: These examples use `QueryableStoreTypes.keyValueStore()` and 
`ReadOnlyKeyValueStore<String, Long>`, so interactive queries return values 
only (the counts). The materialized store also retains timestamps; use 
`QueryableStoreTypes.timestampedKeyValueStore()` and 
`ReadOnlyKeyValueStore<String, ValueAndTimestamp<Long>>` if you need timestamps 
in query results.
+
     
     Properties  props = ...;
     StreamsBuilder builder = ...;
@@ -212,8 +222,10 @@ You can also materialize the results of stateless 
operators by using the overloa
 
 A window store will potentially have many results for any given key because 
the key can be present in multiple windows. However, there is only one result 
per window for a given key.
 
-To query a local window store, you must first create a topology with a window 
store. This example creates a window store named "CountsWindowStore" that 
contains the counts for words in 1-minute windows.
-    
+To query a windowed store, you first build a topology with a windowed 
aggregation (for example, using `windowedBy` followed by `count()`). This 
example uses `count()` to create a timestamped window store named 
`CountsWindowStore` with 1-minute windows for per-word counts.
+
+Note: These examples use `QueryableStoreTypes.windowStore()` and 
`ReadOnlyWindowStore<String, Long>`, so interactive queries return values only 
per window. The materialized store also retains timestamps; use 
`QueryableStoreTypes.timestampedWindowStore()` and `ReadOnlyWindowStore<String, 
ValueAndTimestamp<Long>>` if you need timestamps in query results.
+
     
     StreamsBuilder builder = ...;
     KStream<String, String> textLines = ...;
@@ -249,7 +261,7 @@ After the application has started, you can get access to 
"CountsWindowStore" and
 
 **Note**
 
-Only the [Processor 
API](processor-api.html#streams-developer-guide-processor-api) supports custom 
state stores.
+Only the [Processor 
API](/{version}/streams/developer-guide/processor-api/#implementing-custom-state-stores)
 supports custom state stores.
 
 Before querying the custom state stores you must implement these interfaces:
 
diff --git a/docs/streams/developer-guide/processor-api.md 
b/docs/streams/developer-guide/processor-api.md
index c67c42fcb3a..9fcf5d0e3a4 100644
--- a/docs/streams/developer-guide/processor-api.md
+++ b/docs/streams/developer-guide/processor-api.md
@@ -173,11 +173,16 @@ Yes (enabled by default)
   * Stores its data on local disk.
   * Storage capacity: managed local state can be larger than the memory (heap 
space) of an application instance, but must fit into the available local disk 
space.
   * RocksDB settings can be fine-tuned, see [RocksDB 
configuration](config-streams.html#streams-developer-guide-rocksdb-config).
-  * Available [store 
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)):
 timestamped key-value store, versioned key-value store, time window key-value 
store, session window key-value store.
-  * Use 
[persistentTimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\))
 when you need a persistent key-(value/timestamp) store that supports 
put/get/delete and range queries.
-  * Use 
[persistentVersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\))
 when you need a persistent, versioned key-(value/timestamp) store that 
supports put/get/delete and timestamped get operations.
-  * Use 
[persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
 or 
[persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
 when you need a persistent timeWindowedKey-value or 
timeWindowedKey-(value/timestamp) store, respectively.
-  * Use 
[persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\))
 when you need a persistent sessionWindowedKey-value store.
+  * Available [persistent store 
variants](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)>):
 plain key-value store (values only — no embedded record timestamp in state), 
timestamped key-value store, versioned key-value store, windowed store, session 
store. Header-aware variants are also described below.
+  * Use 
[persistentKeyValueStore](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)>)
 when you need a persistent plain key-value store (no embedded record 
timestamp).
+  * Use 
[persistentTimestampedKeyValueStore](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)>)
 when you need a persistent key-(value/timestamp) store that supports 
put/get/delete and range queries.
+  * Use 
[persistentVersionedKeyValueStore](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)>)
 when you need a persistent, versioned key-(value/timestamp) store that 
supports put/get/delete and timestamped get operations.
+  * Use 
[persistentWindowStore](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)>)
 or 
[persistentTimestampedWindowStore](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)>)
 when you need a persistent plain windowed store or a persiste [...]
+  * Use 
[persistentSessionStore](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore(java.lang.String,java.time.Duration)>)
 when you need a persistent session store.
+  * **Headers:** To persist [record 
headers](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/processor/api/Record.html#headers()>)
 in state, use a `WithHeaders` store supplier together with its corresponding 
`StoreBuilder` factory (see [Headers in State Stores](#headers-in-state-stores) 
below). There are no `WithHeaders` suppliers for plain persistent key-value or 
plain persistent windowed stores. `WithHeaders` suppliers exist only for 
persistent timestamped key-valu [...]
+  * Use 
[persistentTimestampedKeyValueStoreWithHeaders](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStoreWithHeaders(java.lang.String)>)
 with 
[timestampedKeyValueStoreWithHeadersBuilder](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#timestampedKeyValueStoreWithHeadersBuilder(org.apache.kafka.streams.state.KeyValueBytesStoreSupplier,org.apache.kafka.common.serialization.Serde,o
 [...]
+  * Use 
[persistentTimestampedWindowStoreWithHeaders](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStoreWithHeaders(java.lang.String,java.time.Duration,java.time.Duration,boolean)>)
 with 
[timestampedWindowStoreWithHeadersBuilder](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#timestampedWindowStoreWithHeadersBuilder(org.apache.kafka.streams.state.WindowBytesStoreSupplier,org.apach
 [...]
+  * Use 
[persistentSessionStoreWithHeaders](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStoreWithHeaders(java.lang.String,java.time.Duration)>)
 with 
[sessionStoreWithHeadersBuilder](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#sessionStoreWithHeadersBuilder(org.apache.kafka.streams.state.SessionBytesStoreSupplier,org.apache.kafka.common.serialization.Serde,org.apache.kafka.common.seriali
 [...]
 
 
     
@@ -217,7 +222,7 @@ Yes (enabled by default)
   * Stores its data in memory.
   * Storage capacity: managed local state must fit into memory (heap space) of 
an application instance.
   * Useful when application instances run in an environment where local disk 
space is either not available or local disk space is wiped in-between app 
instance restarts.
-  * Available [store 
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-):
 time window key-value store, session window key-value store.
+  * Available [in-memory store 
variants](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore(java.lang.String)>):
 plain key-value store, windowed store, session store.
   * Use 
[TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)
 when you need a key-(value/timestamp) store that supports put/get/delete and 
range queries.
   * Use 
[TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)
 when you need to store windowedKey-(value/timestamp) pairs.
   * There is no built-in in-memory, versioned key-value store at this time.
@@ -299,9 +304,19 @@ You can query timestamped state stores both with and 
without a timestamp.
   * For DSL operators, store data is upgraded lazily in the background.
   * No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you 
can opt-in by implementing the 
[TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html)
 interface. In this case, the old format is retained, and Streams uses a proxy 
store that removes/adds timestamps on read/write.
 
+## Headers in State Stores {#headers-in-state-stores}
 
+You can materialize Kafka [record 
headers](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/processor/api/Record.html#headers()>)
 into RocksDB-backed state together with keys and values. Plain persistent 
key-value stores keep values without an embedded record timestamp; suppliers 
for timestamped key-value, windowed, or session semantics expose timestamps 
according to each store type. Use this when downstream processing needs access 
to record headers from prior input — [...]
 
-## Versioned Key-Value State Stores
+Only persistent, RocksDB-backed suppliers exist for header-aware stores (the 
`Stores` factory names start with `persistent` and end with `WithHeaders`). 
+
+Use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) 
methods whose names end with `WithHeaders`, each paired with the corresponding 
`StoreBuilder` factory. For example, pair 
`persistentTimestampedKeyValueStoreWithHeaders` with 
`timestampedKeyValueStoreWithHeadersBuilder`, 
`persistentTimestampedWindowStoreWithHeaders` with 
`timestampedWindowStoreWithHeadersBuilder`, and 
`persistentSessionStoreWithHeaders` with `sessionStoreWithHeadersBuilder`.
+
+Key-value and window reads return 
[ValueTimestampHeaders](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html>),
 which combines the value, its timestamp, and the associated headers. 
[SessionStoreWithHeaders](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html>)
 stores session aggregations as 
[AggregationWithHeaders](<https://kafka.apache.org/{version}/javadoc/org/apache/kafka/streams/sta
 [...]
+
+**Upgrade note:** Rolling bounce, changelog compatibility, lazy on-disk 
migration, performance trade-offs, and downgrade constraints are covered in the 
[Kafka Streams upgrade 
guide](../../upgrade-guide/#kip-1271-headers-aware-stores).
+
+## Versioned Key-Value State Stores {#versioned-key-value-state-stores}
 
 Versioned key-value state stores are available since Kafka Streams 3.5. Rather 
than storing a single record version (value and timestamp) per key, versioned 
state stores may store multiple record versions per key. This allows versioned 
state stores to support timestamped retrieval operations to return the latest 
record (per key) as of a specified timestamp.
 
@@ -334,7 +349,7 @@ A read-only state store materialized the data from its 
input topic. It also uses
 
 **note:** beware of the partitioning requirements when using read-only state 
stores for lookups during processing. You might want to make sure the original 
changelog topic is co-partitioned with the processors reading the read-only 
statestore.
 
-## Implementing Custom State Stores
+## Implementing Custom State Stores {#implementing-custom-state-stores}
 
 You can use the built-in state store types or implement your own. The primary 
interface to implement for the store is 
`org.apache.kafka.streams.processor.StateStore`. Kafka Streams also has a few 
extended interfaces such as `KeyValueStore` and `VersionedKeyValueStore`.
 
diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md
index da69b5c8af8..47e91c31c4f 100644
--- a/docs/streams/upgrade-guide.md
+++ b/docs/streams/upgrade-guide.md
@@ -75,6 +75,24 @@ Kafka Streams now allows to purge local state directories 
and checkpoint files d
 
 Kafka Streams now persists state store changelog offsets inside each state 
store rather than in a single per-task `.checkpoint` file 
([KIP-1035](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets)).
 This is an internal infrastructure change and is transparent to most users — 
existing per-task `.checkpoint` files are migrated automatically on first 
startup, and no application or operator action is required. EOS crash behavior 
is unchanged in [...]
 
+### Header-aware state stores for the Processor API (KIP-1271) 
{#kip-1271-headers-aware-stores}
+
+Kafka Streams adds **header-aware** state stores. Opt in with the new `Stores` 
suppliers whose names end with `WithHeaders` and the matching `StoreBuilder` 
factories. For example:
+
+- `persistentTimestampedKeyValueStoreWithHeaders` with 
`timestampedKeyValueStoreWithHeadersBuilder`
+- `persistentTimestampedWindowStoreWithHeaders` with 
`timestampedWindowStoreWithHeadersBuilder`
+- `persistentSessionStoreWithHeaders` with `sessionStoreWithHeadersBuilder`
+
+See the [Processor API state store 
documentation](/{version}/streams/developer-guide/processor-api/#headers-in-state-stores).
+
+Existing applications that keep using the same headerless `Stores` suppliers 
and builders are unaffected: storage format, changelogs, and performance stay 
as before.
+
+For stores that adopt the header-aware format, KIP-1271 defines a single 
rolling-bounce upgrade: the changelog topic format is unchanged, legacy rows 
are read with empty header sets until rewritten, and RocksDB-backed stores 
migrate data lazily on access. Downgrading in place after migration is not 
supported except by clearing local store data and restoring from the changelog.
+
+Storing headers increases disk and serialization cost versus headerless 
stores; the KIP discusses lazy header parsing and other performance 
considerations.
+
+`TopologyTestDriver` and Interactive Queries support the new store types. The 
existing `store()` facades continue to return values (or `ValueAndTimestamp`) 
without exposing record headers. See the [interactive queries 
guide](/{version}/streams/developer-guide/interactive-queries/#header-aware-stores-interactive-queries).
+
 ### Deprecation of streams-scala module (KIP-1244)
 
 The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is 
deprecated in 4.3.0 and will be removed in 5.0.

Reply via email to