Hi Matthias and Lucas,

Thank you for the feedback. I have updated the KIP based on our discussion.
Here are the specific updates and clarifications:
1. Header Size and Format

I have removed the two-byte header size and its associated configuration
property. Since we are using varints for the header count, as well as for
individual key and value sizes within each header, a global "headers size"
field is redundant and could be restrictive.
2. Session Store Interfaces

Regarding the inclusion of both ReadOnlySessionStoreWithHeaders and
SessionStoreWithHeaders: In KIP-258, no ReadOnly variant was needed for
Timestamped stores because of how they were integrated. However, since
Kafka Streams currently lacks a TimestampedSessionStore, the interfaces
introduced here are modeled after the existing SessionStore and
ReadOnlySessionStore for consistency. I am open to removing the ReadOnly
variant if it is considered over-engineering; I’ll look to Frank for
further input on this specific architectural choice.
3. Versioned Store Hierarchy

On the question of why VersionedKeyValueStoreWithHeaders does not extend
VersionedKeyValueStore: While we could implement long put(K key, V value,
long timestamp), it wasn't immediately clear if the inheritance provided a
strong benefit for this specific header-aware use case. I would appreciate
your thoughts on whether you see this inheritance as a requirement for API
consistency.
4. Upgrade Path and Versioning

I have clarified the upgrade path in the KIP. To address Lucas’s point
about magic/version bytes: adding a version byte to every record introduces
significant storage overhead. Instead, I’ve updated the proposal to follow
the KIP-258 approach, utilizing a new Column Family in RocksDB for the
header-aware format. This isolates the data and allows for a cleaner
migration. Please see the updated "Upgrade Path" subsection for details.
5. StateSerdes Public API

While StateSerdes is public, I understand it is rarely used by the
developers. However, it remains a helpful utility for power users
implementing custom StateStore types for external databases. BTW, if we
decide it should be internal, we can deprecate it in this KIP.
6. Performance & Lazy Deserialization

Thank you, Lucas, for raising the point about Iterators. I have added a
"Performance Considerations" subsection. While the varint-based format
requires scanning to find payload offsets, we can implement a "semi-lazy"
approach. This allows us to scan varints to determine offsets without the
high GC overhead of allocating objects for data that may not be accessed
during a range scan.
7. Changelog Interaction

To clarify: record headers will be preserved in the changelog topic as
native Kafka record headers.

Bests,
Alieh

On Tue, Jan 13, 2026 at 7:10 PM TengYao Chi <[email protected]> wrote:

> Hi Matthias,
>
> From my understanding (please correct me if i am wrong):
> KV/Window Stores: Use ValueTimestampHeaders<V> because they store
> individual records that always require a timestamp.
>
> Session Stores: Use AggregationWithHeaders<AGG> because the AGG result is
> generic (e.g., Long, String, or POJO), and session boundaries are managed
> by the store rather than the value wrapper.
>
> Additionally, during session merges, the application logic must decide
> which headers to persist. This wrapper provides a clean way to store that
> custom-processed metadata. The downside is that the user must explicitly
> handle how to merge headers.
>
> I am also wondering if we could just use ValueTimestampHeaders directly to
> simplify the API?
>
> Best,
> TengYao
>
> On 2026/01/12 18:44:54 Alieh Saeedi via dev wrote:
> > Hey Chia-Ping,
> >
> > Thanks for the feedback.
> >
> > chia_0: the headers must be serialized as a byte array. I updated the KIP
> > about that.
> > chia_1: this is the reason different store types are introduced. The
> newly
> > introduced stores keep the headers, while the current ones not.
> > chia_2: that’s a valid point. I think it’s better to keep the order
> rather
> > than change it.
> >
> >
> > Bests,
> > Alieh
> >
> > On Sat, Jan 10, 2026 at 2:21 AM Chia-Ping Tsai <[email protected]>
> wrote:
> >
> > > hi Alieh
> > >
> > > Thanks for the KIP. This proposal seems to open the door for many
> > > interesting use cases. I have a few questions?
> > >
> > > chia_0: could you clarify the serialization format of headers_bytes?
> > >
> > > chia_1:  how does the state store distinguish between legacy values and
> > > new values with headers? Since the new format starts with a 2-bytes
> length,
> > > is there a risk of ambiguity with existing data?
> > >
> > > chia_2: does the implementation guarantee that the order of headers is
> > > preserved
> > >
> > > Best,
> > > Chia-Ping
> > >
> > >
> > >
> > > > Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14 寫道:
> > > >
> > > > Hi all,
> > > >
> > > > I’d like to start a discussion on KIP-1271, which proposes allowing
> Kafka
> > > > Streams state stores to preserve record headers.
> > > > This would let header-based metadata like schema IDs, tracing info,
> and
> > > > feature flags be stored and restored alongside values.
> > > > The KIP introduces header-aware store types and a small config to
> cap the
> > > > size of headers written into state.
> > > > Details are in the KIP:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
> > > > .
> > > > I’d appreciate your feedback and questions on the proposal.
> > > >
> > > > Thanks,
> > > > Alieh
> > >
> >
>

Reply via email to