Hey Matthias, Thanks for all the good points you raised.
MJS-5: It’s good that we walked through all the possible options. It initially seemed like a nice idea, but as you pointed out, there’s no real benefit and we might even end up with longer values. In both formats we still need to compute an offset by reading a varint to retrieve the value. I also updated the KIP and added a few sentences to clarify why we store headers_size as well. MJS-6: Yes, KIP‑258 already applied the same pattern for window/session stores: KV uses dual column families with lazy per‑entry migration, while window/session stores do a clean break at the segment level so old segments stay in the legacy format and new ones use the new format. Segment‑level versioning maps well to windows/sessions because they’re already time‑segmented and constrained by retention, so we avoid dual‑CF complexity in every small segment DB while still getting a natural rolling upgrade as old segments age out. Compared with a dual‑CF “dual accessor” approach for window/session, the clean break is less code, easier to reason about, and reduces RocksDB overhead, with the trade‑off that legacy segments never get backfilled with new metadata unless one explicitly rebuilds or migrates the state. I added some clarification to the KIP on that point as well. MJS-7 and MJS-8: Great catches—I've updated the Compatibility and Testing sections accordingly. Best, Alieh On Tue, Jan 27, 2026 at 6:50 PM Matthias J. Sax <[email protected]> wrote: > Great discussion! Seems we are heading into the right direction. > > Thanks for clarifying the open question about the header serialization > format, VersionedRecordWithHeaders, StateSerdes, and upgrade path. > > > A few follow up questions: > > MJS-5: As we are keeping `headers_size` now, I am wondering if there > would be a benefit to change the byte format to the same order as used > in Kafka messages, ie > > [payload_size][payload][headers_byte] > > The only disadvantage I see would be, that I expect `header_size` to be > smaller than `payload_size` for most cases, so we might need a little > bit more space on average for the var-int encoding. But in both cases, > we would be able to implement lazy deserialization. Not saying we have > to do it this way -- in general I agree there is not much benefit to use > the same order as Kafka messages do as it was already pointed out. Just > wanted to mention it for completeness. Thoughts? > > > MJS-5-B: One request though: the KIP should explain why we need to add > `header_size` (or `payload_size` in case we really make this change). > Reading the KIP as-is, I would always ask myself why we would need > `header_size` -- so mentioning lazy deserialization explicitly as reason > why we add this field would be great to not puzzle readers about it. -- > The KIP mentions lazy-deserialization later in the "Compatibility" > section, but does not make the connection to `header_size` field > explicit in this section either. > > > MJS-6. For the upgrade path the KIP mentions > > > Window/Session: Employs a clean break at the segment level—old segments > stay as-is; new segments use the new format. > > I am wondering why we do it this way? Did KIP-258 also do this (I cannot > remember). It's an interesting idea. I am just wondering about pros/cons > compared to follow the same dual-cf-accessor path as we do for > non-windowed stores. Also from an implementation POV -- would it be more > or less code to write? > > > MJS-7. In the "Compatibility" section the KIP states > > > Backward Compatibility > > - Public API: No existing APIs are deprecated. The new header-aware > interfaces and factory methods are additive. > > As we deprecate some methods on `StateSerdes` now, this is not correct > any longer and should be updated. > > > MJS-8: Testing. -- There is no mentioning of system tests. And maybe we > don't need any. But might be good to be explicit. Did KIP-258 add new > system tests? > > > > @TengYao: Yes, your understanding of KS/Windowed vs Session store is > correct. It's really all about the optimization to avoid storing "event > time" for sessions twice, as we know "event time == window-end". That's > why using `ValueTimestampHeaders` for header-session store might not be > ideal, as we would lose this optimization. Introducing > `AggregationWithHeaders` is an attempt to keep this optimization though. > > > > > -Matthias > > > > > On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote: > > Updates to KIP > > > > - > > > > 1- A varint header_size field is introduced to enable lazy > deserialization > > when scanning large ranges. > > - > > > > 2- The current serialization/deserialization methods in StateSerdes are > > marked as deprecated to keep the class concise. > > - > > > > 3- Note that VersionedKeyValueStoreWithHeaders cannot extend > > VersionedKeyValueStore because their methods differ in input and/or > output > > types. In particular, the VersionedRecord returned by > VersionedKeyValueStore > > methods is a final class and therefore cannot be subclassed. > > > > Thanks, > > Alieh > > > > On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]> > wrote: > > > >> chia_03: Regarding the header size, using a Varint is consistent with > >> Kafka's serialization standards. It avoids the overhead of a large > >> fixed-size field while still achieving the efficient skipping > capability we > >> want. > >> > >> chia_04: That makes sense. > >> > >> Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四 下午10:59寫道: > >> > >>> Hi Chia-Ping Tsai, > >>> > >>> Thanks for the feedback. > >>> > >>> chia_03: The difficulty with adding a header length is deciding > between a > >>> fixed-size field for all records or a configuration allowing users to > >>> define a maximum. Alternatively, we could consider using a varint for > the > >>> header length to remain flexible and space-efficient. > >>> > >>> chia_04: > >>> It only makes sense to give the second column family its own RocksDB > >>> config if its access pattern or data characteristics are materially > >>> different. > >>> Here we have the same keys, the > >>> same or very similar read/write patterns (e.g., same get, put, range > >>> queries), > >>> and roughly comparable value sizes (CF2 slightly larger per entry). > >>> Then from RocksDB’s perspective the two CFs behave very similarly: > >>> both are generic key–value blobs, written and read with the same > >>> pattern. Most of the important RocksDB options (compaction style, > >>> write buffer sizes, block cache, bloom filters, etc.) would be tuned > >>> the same way for both. > >>> Do you see huge difference between these two? > >>> > >>> Thanks, > >>> Alieh > >>> > >>> On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai <[email protected]> > >>> wrote: > >>> > >>>> hi > >>>> > >>>> chia_03: should we provide a more effective way to load the value > >>> without > >>>> scanning the header bytes? (e.g., by storing the total size of > headers) > >>>> > >>>> chia_04: Do we need to allow separate Rocksdb configuration for the > new > >>>> column family > >>>> > >>>> Best, > >>>> Chia-Ping > >>>> > >>>> On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote: > >>>>> Hi all, > >>>>> > >>>>> I’d like to start a discussion on KIP-1271, which proposes allowing > >>> Kafka > >>>>> Streams state stores to preserve record headers. > >>>>> This would let header-based metadata like schema IDs, tracing info, > >>> and > >>>>> feature flags be stored and restored alongside values. > >>>>> The KIP introduces header-aware store types and a small config to cap > >>> the > >>>>> size of headers written into state. > >>>>> Details are in the KIP: > >>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores > >>>>> . > >>>>> I’d appreciate your feedback and questions on the proposal. > >>>>> > >>>>> Thanks, > >>>>> Alieh > >>>>> > >>>> > >>> > >> > > > >
