Thanks for clarifying.

MJS1: If we agree that we don't need the config, can you remove it from the KIP?

Follow up to MJS1: I am wondering if using 2 bytes for header length could be limiting, or why it's actually necessary? While 64KB of header payload does not sound small, there might be corner cases for which larger headers are required? But not sure why we would need to store the size explicitly (looking into `DefaultRecord.java` I don't see such a filed either)? It seems, we need to read and process `headers_bytes` anyway, and as we have the "header count" field, we should be able to process `headers_bytes` w/o the need to know it's size in number of bytes?


Side comment for "chia_2". Yes, we need to preserve order. Headers are a list of key-value pairs (it's not a map -- keys are also not unique).


MJS2: Ah, I understand the purpose now. But not sure if I understand the class hierarchy yet? For timestamped stores, we only introduced

interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, 
ValueAndTimestamp<V>>

but no `ReadOnlyTimestampedKeyValueStore`. The KIP now mentions two new interfaces

interface SessionStoreWithHeaders<K, AGG> extends StateStore

and

interface ReadOnlySessionStoreWithHeaders<K, AGG> extends ReadOnlySessionStore<K, 
AggregationWithHeaders<AGG>>

So I am wondering why we need both, or why it won't be sufficient to only add

interface SessionStoreWithHeaders<K, AGG> extends ReadOnlySessionStore<K, 
AggregationWithHeaders<AGG>>

similar to what the KIP proposes for KV- and Windowed-Store?


MJS2-B: Why does `VersionedKeyValueStoreWithHeaders` not extend `VersionedKeyValueStore` ?



Side comment question about "chia_1": Isn't the question asking about the upgrade path? The KIP mentions "lazy migration" from the old format to the new format. But it's not explained, when we read a row from RocksDB, how we know if the row is in the old or new format?



MJS3: Thanks. -- Reading the section "Compatibility, Deprecation, and Migration Plan", I am not sure if I fully understand.

If users provide a custom XxxByteStoreSupplier no upgrade happens (but users can opt-in implementing HeaderBytesStore interface) but the old format is kept. We use a proxy store that removes/adds headers on read/write.

If the is a custom store, which does not implement `HeaderBytesStore`, it implies that the store does not support headers, and the old format is uses. This make sense. But why would we need a proxy store for this case? -- I would understand that we need a proxy store for the case that we change the DSL to use the new header format, and a non-header store supplier is passed. But DSL support is out-of-scope for now, so not sure what the purpose of a proxy would be?

Also not sure if I fully understand

The HeaderBytesStore interface will be implemented by Kafka Streams for the newly added 
stores. Only users who implement custom key-value/window/session stores are not affected, 
as their stores will be wrapped by the proxy store. User can "opt-in" of 
course, and change their custom key-value/window/session store implementation to support 
the new headers-aware interfaces by implementingHeaderBytesStore in their own store 
classes. For other user implemented stores that add a new StateStore type, nothing 
changes for existing code.



MJS4: Looking into the code in more detail, I am actually wondering if changing `StateSerdes` would impact users? It seems to be only used internally, and it's actually not clear to me, why it's public API to begin with? Should we maybe even deprecate the whole class so we can move it to an internal package where it seems to belong to? Or do I miss something and there is a reason why it's public API?



-Matthias


On 1/12/26 1:07 PM, Alieh Saeedi via dev wrote:
Hey Mattias,

Thanks for the feedback and insightful questions.

MJS1: The maximum header length is fixed at 2 bytes, which corresponds to
64 KB of headers data (in byte[]). This is a hard limit, so we can remove
the configuration. I had originally considered supporting larger,
configurable headers via this setting.
MJS2: It is needed for `ReadOnlySessionStoreWithHeader` (analogous to
`ReadOnlySessionStore`).
I updated the KIP. @Frank Chi <[email protected]> can explain it better. MJS3:
I updated the KIP with your comment.
MJS4: I assume it is doable. However, the users calling such APIs directly,
have to update their app code.

On Mon, Jan 12, 2026 at 2:39 AM Matthias J. Sax <[email protected]> wrote:

Thanks for the KIP Alieh. Very exciting to see this getting started.
It's long standing gap in Kafka Streams.


Couple of questions:


MJS1: Why do we need `streams.store.headers.max.bytes` config? We
currently do not limit the size of the key or value inside KS layer, and
I don't see why we would need a bound for headers? Of course, there is
Kafka configs which limit the size of a message, and from my
understanding it would implicitly also cover headers. So I am wondering
what we gain by this config?


MJS2: What is the purpose of `AggregationWithHeaders` class?


MJS3: You propose to add a new `HeaderByteStore` interface (similar to
`TimestampedBytesStore`), but I believe its purpose is not just to
provide a static method to convert the format. I believe it must also be
used as a "marker interface", to tell the runtime if the
internal/underlying RocksDBStore is a "headers store" or not (this is
necessary for backward compatibility, and upgrading). This change
implies that if a user implements a custom store, they would need to
also implement this interface if the store supports headers (and can be
plugging into the DSL as "header store" -- of course, DSL support is a
future KIP only, but I think we should already cover this part correctly
from day one on, to ensure we don't hit issue down the road). If a
(byte) store does not implement the `HeaderBytesStore` interface, it
means that the returned value-byte do not have the propose
"header+payload" format. The KIP should explain this in more details.



MJS4: For `StateSerdes`, I am wondering if we should deprecate the
existing methods for which new overloads including a `Headers` parameter
are added? If a store does not support headers, it would just ignore
this new parameter, and we could remove the existing methods with 5.0
(?) release and keep the API surface area smaller. Or do we still need
the existing methods?


-Matthias



On 1/9/26 5:20 PM, Chia-Ping Tsai wrote:
hi Alieh

Thanks for the KIP. This proposal seems to open the door for many
interesting use cases. I have a few questions?

chia_0: could you clarify the serialization format of headers_bytes?

chia_1:  how does the state store distinguish between legacy values and
new values with headers? Since the new format starts with a 2-bytes length,
is there a risk of ambiguity with existing data?

chia_2: does the implementation guarantee that the order of headers is
preserved

Best,
Chia-Ping



Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14 寫道:

Hi all,

I’d like to start a discussion on KIP-1271, which proposes allowing
Kafka
Streams state stores to preserve record headers.
This would let header-based metadata like schema IDs, tracing info, and
feature flags be stored and restored alongside values.
The KIP introduces header-aware store types and a small config to cap
the
size of headers written into state.
Details are in the KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
.
I’d appreciate your feedback and questions on the proposal.

Thanks,
Alieh




Reply via email to