Thanks for the KIP Alieh. Very exciting to see this getting started. It's long standing gap in Kafka Streams.

Couple of questions:


MJS1: Why do we need `streams.store.headers.max.bytes` config? We currently do not limit the size of the key or value inside KS layer, and I don't see why we would need a bound for headers? Of course, there is Kafka configs which limit the size of a message, and from my understanding it would implicitly also cover headers. So I am wondering what we gain by this config?


MJS2: What is the purpose of `AggregationWithHeaders` class?


MJS3: You propose to add a new `HeaderByteStore` interface (similar to `TimestampedBytesStore`), but I believe its purpose is not just to provide a static method to convert the format. I believe it must also be used as a "marker interface", to tell the runtime if the internal/underlying RocksDBStore is a "headers store" or not (this is necessary for backward compatibility, and upgrading). This change implies that if a user implements a custom store, they would need to also implement this interface if the store supports headers (and can be plugging into the DSL as "header store" -- of course, DSL support is a future KIP only, but I think we should already cover this part correctly from day one on, to ensure we don't hit issue down the road). If a (byte) store does not implement the `HeaderBytesStore` interface, it means that the returned value-byte do not have the propose "header+payload" format. The KIP should explain this in more details.



MJS4: For `StateSerdes`, I am wondering if we should deprecate the existing methods for which new overloads including a `Headers` parameter are added? If a store does not support headers, it would just ignore this new parameter, and we could remove the existing methods with 5.0 (?) release and keep the API surface area smaller. Or do we still need the existing methods?


-Matthias



On 1/9/26 5:20 PM, Chia-Ping Tsai wrote:
hi Alieh

Thanks for the KIP. This proposal seems to open the door for many interesting 
use cases. I have a few questions?

chia_0: could you clarify the serialization format of headers_bytes?

chia_1:  how does the state store distinguish between legacy values and new 
values with headers? Since the new format starts with a 2-bytes length, is 
there a risk of ambiguity with existing data?

chia_2: does the implementation guarantee that the order of headers is preserved

Best,
Chia-Ping



Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14 寫道:

Hi all,

I’d like to start a discussion on KIP-1271, which proposes allowing Kafka
Streams state stores to preserve record headers.
This would let header-based metadata like schema IDs, tracing info, and
feature flags be stored and restored alongside values.
The KIP introduces header-aware store types and a small config to cap the
size of headers written into state.
Details are in the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
.
I’d appreciate your feedback and questions on the proposal.

Thanks,
Alieh

Reply via email to