Hey Mattias,

Thanks for the feedback and insightful questions.

MJS1: The maximum header length is fixed at 2 bytes, which corresponds to
64 KB of headers data (in byte[]). This is a hard limit, so we can remove
the configuration. I had originally considered supporting larger,
configurable headers via this setting.
MJS2: It is needed for `ReadOnlySessionStoreWithHeader` (analogous to
`ReadOnlySessionStore`).
I updated the KIP. @Frank Chi <[email protected]> can explain it better. MJS3:
I updated the KIP with your comment.
MJS4: I assume it is doable. However, the users calling such APIs directly,
have to update their app code.

On Mon, Jan 12, 2026 at 2:39 AM Matthias J. Sax <[email protected]> wrote:

> Thanks for the KIP Alieh. Very exciting to see this getting started.
> It's long standing gap in Kafka Streams.
>
>
> Couple of questions:
>
>
> MJS1: Why do we need `streams.store.headers.max.bytes` config? We
> currently do not limit the size of the key or value inside KS layer, and
> I don't see why we would need a bound for headers? Of course, there is
> Kafka configs which limit the size of a message, and from my
> understanding it would implicitly also cover headers. So I am wondering
> what we gain by this config?
>
>
> MJS2: What is the purpose of `AggregationWithHeaders` class?
>
>
> MJS3: You propose to add a new `HeaderByteStore` interface (similar to
> `TimestampedBytesStore`), but I believe its purpose is not just to
> provide a static method to convert the format. I believe it must also be
> used as a "marker interface", to tell the runtime if the
> internal/underlying RocksDBStore is a "headers store" or not (this is
> necessary for backward compatibility, and upgrading). This change
> implies that if a user implements a custom store, they would need to
> also implement this interface if the store supports headers (and can be
> plugging into the DSL as "header store" -- of course, DSL support is a
> future KIP only, but I think we should already cover this part correctly
> from day one on, to ensure we don't hit issue down the road). If a
> (byte) store does not implement the `HeaderBytesStore` interface, it
> means that the returned value-byte do not have the propose
> "header+payload" format. The KIP should explain this in more details.
>
>
>
> MJS4: For `StateSerdes`, I am wondering if we should deprecate the
> existing methods for which new overloads including a `Headers` parameter
> are added? If a store does not support headers, it would just ignore
> this new parameter, and we could remove the existing methods with 5.0
> (?) release and keep the API surface area smaller. Or do we still need
> the existing methods?
>
>
> -Matthias
>
>
>
> On 1/9/26 5:20 PM, Chia-Ping Tsai wrote:
> > hi Alieh
> >
> > Thanks for the KIP. This proposal seems to open the door for many
> interesting use cases. I have a few questions?
> >
> > chia_0: could you clarify the serialization format of headers_bytes?
> >
> > chia_1:  how does the state store distinguish between legacy values and
> new values with headers? Since the new format starts with a 2-bytes length,
> is there a risk of ambiguity with existing data?
> >
> > chia_2: does the implementation guarantee that the order of headers is
> preserved
> >
> > Best,
> > Chia-Ping
> >
> >
> >
> >> Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14 寫道:
> >>
> >> Hi all,
> >>
> >> I’d like to start a discussion on KIP-1271, which proposes allowing
> Kafka
> >> Streams state stores to preserve record headers.
> >> This would let header-based metadata like schema IDs, tracing info, and
> >> feature flags be stored and restored alongside values.
> >> The KIP introduces header-aware store types and a small config to cap
> the
> >> size of headers written into state.
> >> Details are in the KIP:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
> >> .
> >> I’d appreciate your feedback and questions on the proposal.
> >>
> >> Thanks,
> >> Alieh
>
>

Reply via email to