This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 9bc4f798740 KAFKA-20336: Add documentation DSL headers-aware state
stores (#21905)
9bc4f798740 is described below
commit 9bc4f798740345d456ac7d3f1169f747ea170084
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Wed Jun 3 04:35:57 2026 +0530
KAFKA-20336: Add documentation DSL headers-aware state stores (#21905)
Add documentation for KIP-1285, for DSL headers-aware state stores.
Reviewers: Mickael Maison <[email protected]>, Alieh Saeedi
<asaeedi@confluentio>, Matthias J. Sax <[email protected]>
---
docs/getting-started/upgrade.md | 1 +
docs/streams/developer-guide/config-streams.md | 33 ++++++++++++++++++++++++++
docs/streams/developer-guide/dsl-api.md | 24 +++++++++++++++----
docs/streams/upgrade-guide.md | 30 ++++++++++++++++++++++-
4 files changed, 82 insertions(+), 6 deletions(-)
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index 93e04c828f5..12a114093ab 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -46,6 +46,7 @@ Note: Apache Kafka 4.3 only supports KRaft mode - ZooKeeper
mode has been remove
* The new config have been introduced: `remote.log.metadata.topic.min.isr`
with 2 as default value. You can correct the min.insync.replicas for the
existed __remote_log_metadata topic via kafka-configs.sh if needed. For further
details, please refer to
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
* The new config prefix `remote.log.metadata.admin.` has been introduced. It
allows independent configuration of the admin client used by
`TopicBasedRemoteLogMetadataManager`. For further details, please refer to
[KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
* The `kafka-streams-scala` library is deprecated as of Kafka 4.3 and will
be removed in Kafka 5.0. For further details, please refer to the [migration
guide](/{version}/streams/developer-guide/scala-migration).
+ * Kafka Streams now supports opt-in headers-aware state stores for DSL
operators via the new `dsl.store.format` config. The config accepts `DEFAULT`
or `HEADERS`. Store-supplier APIs use the `DslStoreFormat` enum, whose values
are `PLAIN`, `TIMESTAMPED`, and `HEADERS`; `DslStoreFormat.DEFAULT` does not
exist. This builds on the headers-aware state store implementations introduced
in [KIP-1271](https://cwiki.apache.org/confluence/x/QIM8G). For further details
and current DSL result-head [...]
* Support for cordoning log directories: For further details, please refer
to [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg).
* The `group.coordinator.rebalance.protocols` configuration is deprecated
and will be removed in Kafka 5.0. In Kafka 5.0, all protocols will always be
enabled and controlled solely by feature versions (`group.version`,
`streams.version`, `share.version`) via `kafka-features.sh`. For further
details, please refer to
[KIP-1237](https://cwiki.apache.org/confluence/x/jIqmFw).
* New group configs have been introduced: `share.delivery.count.limit`,
`share.partition.max.record.locks` and `share.renew.acknowledge.enable`, along
with equivalent broker configs for specifying minimum and maximum values. In
addition, the validation of group configs has been improved. For further
details, please refer to
[KIP-1240](https://cwiki.apache.org/confluence/x/tIHMFw).
diff --git a/docs/streams/developer-guide/config-streams.md
b/docs/streams/developer-guide/config-streams.md
index aafc3e68aae..5f2c03376f9 100644
--- a/docs/streams/developer-guide/config-streams.md
+++ b/docs/streams/developer-guide/config-streams.md
@@ -65,6 +65,7 @@ This section contains the most common Streams configuration
parameters. For a fu
* default.timestamp.extractor
* default.value.serde
* deserialization.exception.handler
+ * dsl.store.format
* enable.metrics.push
* ensure.explicit.internal.resource.naming
* group.protocol
@@ -602,6 +603,23 @@ Defines a default state store implementation to be used by
any stateful DSL oper
<tr>
<td>
+dsl.store.format
+</td>
+<td>
+
+Low
+</td>
+<td>
+
+Controls whether DSL operators materialize headers-aware state stores.
Case-insensitive. Accepted values: `default` (uses existing timestamped or
plain store variants per operator) and `headers` (selects headers-aware stores
that can persist record headers alongside values and timestamps; local state
can be larger than under `default`).
+</td>
+<td>
+
+`default`
+</td> </tr>
+<tr>
+<td>
+
ensure.explicit.internal.resource.naming
</td>
<td>
@@ -1359,6 +1377,21 @@ Serde for the inner class of a windowed record. Must
implement the `Serde` inter
>
> This is discussed in more detail in [Data types and
> serialization](datatypes.html#streams-developer-guide-serdes).
+### dsl.store.format {#dsl-store-format}
+
+> Selects the state store format used by all DSL operators that materialize a
state store. Accepted values are `DEFAULT` and `HEADERS` (case-insensitive);
the default is `DEFAULT`.
+>
+> * `DEFAULT`: Uses the existing timestamped or plain store variant per
operator. Existing applications are unaffected.
+> * `HEADERS`: Uses headers-aware stores (introduced by
[KIP-1271](https://cwiki.apache.org/confluence/x/QIM8G)) that can persist
record headers alongside the value and timestamp.
+>
+> This config is global. Per-operator customization is possible by providing a
custom `DslStoreSuppliers` via `Materialized.withStoreType(...)`, or by
supplying explicit headers-aware store suppliers. Note that `dsl.store.format`
is orthogonal to `dsl.store.suppliers.class`, which selects the store
*implementation* (e.g., RocksDB vs in-memory); the two can be set independently.
+>
+> The accepted string values are `DEFAULT` and `HEADERS` (case-insensitive).
These differ from the `DslStoreFormat` Java enum, which has constants `PLAIN`,
`TIMESTAMPED`, and `HEADERS`; `DslStoreFormat.DEFAULT` does not exist as an
enum constant.
+>
+> See [KIP-1271](https://cwiki.apache.org/confluence/x/QIM8G) for migration
procedures, changelog compatibility, restore behavior, and per-record overhead.
+>
+> **Current limitations**: `dsl.store.format=HEADERS` changes the state store
format. It does not define how DSL operators create headers for output records.
Some operators write empty headers to their materialized stores, and the buffer
stores used by `suppress()` and left/outer stream-stream joins are not
headers-aware. See [Stateful
transformations](/{version}/streams/developer-guide/dsl-api.html#stateful-transformations)
and the [Streams upgrade guide](/{version}/streams/upgrade-guid [...]
+
### ensure.explicit.internal.resource.naming
> Whether to enforce explicit naming for all internal resources of the
> topology, including internal topics (e.g., changelog and repartition topics)
> and their associated state stores. When enabled, the application will refuse
> to start if any internal resource has an auto-generated name.
diff --git a/docs/streams/developer-guide/dsl-api.md
b/docs/streams/developer-guide/dsl-api.md
index 0a981f680c9..6a4d2e16711 100644
--- a/docs/streams/developer-guide/dsl-api.md
+++ b/docs/streams/developer-guide/dsl-api.md
@@ -925,6 +925,16 @@ Stateful transformations depend on state for processing
inputs and producing out
+**Headers-aware state stores
([KIP-1285](https://cwiki.apache.org/confluence/x/4ow8G)):** Set
[`dsl.store.format=HEADERS`](config-streams.html#dsl-store-format) to make
supported DSL operators use headers-aware state stores. These stores can keep
record headers together with the value and timestamp.
+
+This config only changes the state store format. It does not define how DSL
operators create headers for output records. Current behavior is:
+
+ * aggregations, KTable-KTable joins, materialized `KTable.mapValues`,
`KStream.toTable()`, and `StreamsBuilder.table()` write empty headers to their
materialized stores
+ * KStream-KStream join window stores keep source-record headers, but join
result records do not get computed or merged headers; they may carry the
headers from the record that triggered the result
+ * `suppress()` and left/outer [KStream-KStream joins](#kstream-kstream-join)
use non-headers-aware buffer stores, so records passing through those buffers
lose their headers
+
+A follow-up KIP will define how DSL result headers are computed.
+
Note, that state stores are fault-tolerant. In case of failure, Kafka Streams
guarantees to fully restore all state stores prior to resuming the processing.
See [Fault Tolerance](../architecture.html#streams_architecture_recovery) for
further information.
Available stateful transformations in the DSL include:
@@ -2113,9 +2123,13 @@ There are two exceptions where co-partitioning is not
required. For KStream-Glob
-#### KStream-KStream Join
+#### KStream-KStream Join {#kstream-kstream-join}
+
+KStream-KStream joins are always windowed joins, because otherwise the size of
the internal state store used to perform the join - e.g., a sliding window or
"buffer" - would grow indefinitely.
-KStream-KStream joins are always windowed joins, because otherwise the size of
the internal state store used to perform the join - e.g., a sliding window or
"buffer" - would grow indefinitely. For stream-stream joins it's important to
highlight that a new input record on one side will produce a join output _for
each_ matching record on the other side, and there can be _multiple_ such
matching records in a given join window (cf. the row with timestamp 15 in the
join semantics table below, [...]
+**Note on headers-aware state stores:** With
[`dsl.store.format=HEADERS`](config-streams.html#dsl-store-format), inner
stream-stream joins use headers-aware join window stores. Left and outer
stream-stream joins also use a separate buffer store for not-yet-matched
records, and that buffer is not headers-aware. Records that pass through this
buffer lose their headers.
+
+Join output records do not get computed or merged headers. The current
forwarding path may carry the headers from the record that triggered the
output. For stream-stream joins it's important to highlight that a new input
record on one side will produce a join output _for each_ matching record on the
other side, and there can be _multiple_ such matching records in a given join
window (cf. the row with timestamp 15 in the join semantics table below, for
example).
Join output records are effectively created as follows, leveraging the
user-supplied `ValueJoiner`:
@@ -4628,7 +4642,7 @@ If we then receive three additional records (including
two out-of-order records)
Detected sessions after having received six input records. Note the two
out-of-order data records at t=4 (green) and t=5 (blue), which lead to a merge
of sessions and an extension of a session, respectively.
-#### Window Final Results
+#### Window Final Results {#window-final-results}
In Kafka Streams, windowed computations update their results continuously. As
new data arrives for a window, freshly computed results are emitted downstream.
For many applications, this is ideal, since fresh results are always available.
and Kafka Streams is designed to make programming continuous computations
seamless. However, some applications need to take action **only** on the final
result of a windowed computation. Common examples of this are sending alerts or
delivering results to [...]
@@ -4659,6 +4673,8 @@ The key parts of this program are:
One thing to note is that suppression is just like any other Kafka Streams
operator, so you can build a topology with two branches emerging from the
`count`, one suppressed, and one not, or even multiple differently configured
suppressions. This allows you to apply suppressions where they are needed and
otherwise rely on the default continuous update behavior.
+**Note on headers-aware state stores:** `suppress()` uses an in-memory buffer
that is not headers-aware. Record headers attached to upstream records are not
preserved across the suppression boundary, even when
[`dsl.store.format=HEADERS`](config-streams.html#dsl-store-format) is set
globally per [KIP-1285](https://cwiki.apache.org/confluence/x/4ow8G).
+
For more detailed information, see the JavaDoc on the `Suppressed` config
object and [KIP-328](https://cwiki.apache.org/confluence/x/sQU0BQ "KIP-328").
Applying processors (Processor API integration)
@@ -5934,5 +5950,3 @@ A complete example of user-defined Serdes can be found in
a test class within th
* [Documentation](/documentation)
* [Kafka Streams](/documentation/streams)
* [Developer Guide](/documentation/streams/developer-guide/)
-
-
diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md
index 8fd9f6193ef..fd3be2e2a9c 100644
--- a/docs/streams/upgrade-guide.md
+++ b/docs/streams/upgrade-guide.md
@@ -95,6 +95,35 @@ Storing headers increases disk and serialization cost versus
headerless stores;
`TopologyTestDriver` and Interactive Queries support the new store types. The
existing `store()` facades continue to return values (or `ValueAndTimestamp`)
without exposing record headers. See the [interactive queries
guide](/{version}/streams/developer-guide/interactive-queries/#header-aware-stores-interactive-queries).
+### Headers-Aware State Stores for DSL Operators (KIP-1285)
+
+[KIP-1285](https://cwiki.apache.org/confluence/x/4ow8G) lets DSL operators use
the headers-aware state stores introduced by
[KIP-1271](#kip-1271-headers-aware-stores). Set
[`dsl.store.format=HEADERS`](/{version}/streams/developer-guide/config-streams.html#dsl-store-format)
to use headers-aware stores for supported DSL operators. These stores can keep
record headers together with the value and timestamp.
+
+The config only chooses the state store format. It does not define how DSL
operators create headers for output records; see [Current
limitations](#current-limitations) below.
+
+```java
+// Enable headers-aware stores globally for all DSL operators
+Properties props = new Properties();
+props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "HEADERS");
+```
+
+Per-operator customization is possible by providing a custom
`DslStoreSuppliers` via `Materialized.withStoreType(...)`, or by supplying
explicit headers-aware store suppliers. The pre-existing `boolean
isTimestamped` constructors and `isTimestamped()` methods on
`DslKeyValueParams` and `DslWindowParams`, and the 3-argument constructor on
`DslSessionParams`, are deprecated in favor of `DslStoreFormat`-based
constructors. Existing applications are not affected by default.
+
+#### Current limitations {#current-limitations}
+
+Today, DSL result headers behave as follows:
+
+* Aggregations (`count`, `reduce`, `aggregate`, including their windowed and
session-windowed variants), KTable-KTable joins (inner / left / outer),
materialized `KTable.mapValues`, `KStream.toTable()`, and
`StreamsBuilder.table()` write empty headers to their materialized stores.
+* KStream-KStream join window stores keep source-record headers, but join
result records do not get computed or merged headers. They may carry the
headers from the record that triggered the result.
+* `suppress()` and left/outer stream-stream joins use non-headers-aware buffer
stores. Records that pass through those buffers lose their headers.
+
+A follow-up KIP will give users explicit control over how DSL result headers
are computed. See [Stateful
transformations](/{version}/streams/developer-guide/dsl-api.html#stateful-transformations)
for more details.
+
+#### Changelog, migration, and performance
+
+KIP-1285 does not change the changelog wire format, the migration procedure,
or the per-store overhead — those are properties of the underlying KIP-1271
stores. See the [KIP-1271 section](#kip-1271-headers-aware-stores) above for
the full description of changelog compatibility, the lazy per-key RocksDB
migration on `DEFAULT`→`HEADERS`, the restore behavior, and the per-record size
impact. The DSL config `dsl.store.format` only controls which operators
participate; once an operator is usi [...]
+
+
### Deprecation of streams-scala module (KIP-1244)
The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is
deprecated in 4.3.0 and will be removed in 5.0.
@@ -552,4 +581,3 @@ The following table shows which versions of the Kafka
Streams API are compatible
* [Documentation](/documentation)
* [Kafka Streams](/documentation/streams)
-