Hey Mattias, Thanks for the feedback and insightful questions.
MJS1: The maximum header length is fixed at 2 bytes, which corresponds to 64 KB of headers data (in byte[]). This is a hard limit, so we can remove the configuration. I had originally considered supporting larger, configurable headers via this setting. MJS2: It is needed for `ReadOnlySessionStoreWithHeader` (analogous to `ReadOnlySessionStore`). I updated the KIP. @Frank Chi <[email protected]> can explain it better. MJS3: I updated the KIP with your comment. MJS4: I assume it is doable. However, the users calling such APIs directly, have to update their app code. On Mon, Jan 12, 2026 at 2:39 AM Matthias J. Sax <[email protected]> wrote: > Thanks for the KIP Alieh. Very exciting to see this getting started. > It's long standing gap in Kafka Streams. > > > Couple of questions: > > > MJS1: Why do we need `streams.store.headers.max.bytes` config? We > currently do not limit the size of the key or value inside KS layer, and > I don't see why we would need a bound for headers? Of course, there is > Kafka configs which limit the size of a message, and from my > understanding it would implicitly also cover headers. So I am wondering > what we gain by this config? > > > MJS2: What is the purpose of `AggregationWithHeaders` class? > > > MJS3: You propose to add a new `HeaderByteStore` interface (similar to > `TimestampedBytesStore`), but I believe its purpose is not just to > provide a static method to convert the format. I believe it must also be > used as a "marker interface", to tell the runtime if the > internal/underlying RocksDBStore is a "headers store" or not (this is > necessary for backward compatibility, and upgrading). This change > implies that if a user implements a custom store, they would need to > also implement this interface if the store supports headers (and can be > plugging into the DSL as "header store" -- of course, DSL support is a > future KIP only, but I think we should already cover this part correctly > from day one on, to ensure we don't hit issue down the road). If a > (byte) store does not implement the `HeaderBytesStore` interface, it > means that the returned value-byte do not have the propose > "header+payload" format. The KIP should explain this in more details. > > > > MJS4: For `StateSerdes`, I am wondering if we should deprecate the > existing methods for which new overloads including a `Headers` parameter > are added? If a store does not support headers, it would just ignore > this new parameter, and we could remove the existing methods with 5.0 > (?) release and keep the API surface area smaller. Or do we still need > the existing methods? > > > -Matthias > > > > On 1/9/26 5:20 PM, Chia-Ping Tsai wrote: > > hi Alieh > > > > Thanks for the KIP. This proposal seems to open the door for many > interesting use cases. I have a few questions? > > > > chia_0: could you clarify the serialization format of headers_bytes? > > > > chia_1: how does the state store distinguish between legacy values and > new values with headers? Since the new format starts with a 2-bytes length, > is there a risk of ambiguity with existing data? > > > > chia_2: does the implementation guarantee that the order of headers is > preserved > > > > Best, > > Chia-Ping > > > > > > > >> Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14 寫道: > >> > >> Hi all, > >> > >> I’d like to start a discussion on KIP-1271, which proposes allowing > Kafka > >> Streams state stores to preserve record headers. > >> This would let header-based metadata like schema IDs, tracing info, and > >> feature flags be stored and restored alongside values. > >> The KIP introduces header-aware store types and a small config to cap > the > >> size of headers written into state. > >> Details are in the KIP: > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores > >> . > >> I’d appreciate your feedback and questions on the proposal. > >> > >> Thanks, > >> Alieh > >
