Hi all, The `convertToHeaderFormat(final byte[] value)` has been corrected from the previous, mistaken signature `convertToHeaderFormat(final byte[] key, final byte[] value)`.
Bests, Alieh On Mon, Feb 2, 2026 at 7:34 PM Alieh Saeedi <[email protected]> wrote: > Hi all > > Regarding SessionStoreWithHeaders, we reintroduced the > AggregationWithHeaders data type so we no longer rely on > ValueTimestampHeaders, thereby avoiding storage of unnecessary timestamps > for sessions in session stores. > > Bests, > Alieh > > On Thu, Jan 29, 2026 at 9:26 PM Alieh Saeedi <[email protected]> wrote: > >> Hey Matthias, >> >> Thanks for all the good points you raised. >> >> MJS-5: It’s good that we walked through all the possible options. It >> initially seemed like a nice idea, but as you pointed out, there’s no real >> benefit and we might even end up with longer values. In both formats we >> still need to compute an offset by reading a varint to retrieve the value. >> I also updated the KIP and added a few sentences to clarify why we store >> headers_size as well. >> >> MJS-6: Yes, KIP‑258 already applied the same pattern for window/session >> stores: KV uses dual column families with lazy per‑entry migration, while >> window/session stores do a clean break at the segment level so old segments >> stay in the legacy format and new ones use the new format. Segment‑level >> versioning maps well to windows/sessions because they’re already >> time‑segmented and constrained by retention, so we avoid dual‑CF complexity >> in every small segment DB while still getting a natural rolling upgrade as >> old segments age out. Compared with a dual‑CF “dual accessor” approach for >> window/session, the clean break is less code, easier to reason about, and >> reduces RocksDB overhead, with the trade‑off that legacy segments never get >> backfilled with new metadata unless one explicitly rebuilds or migrates the >> state. I added some clarification to the KIP on that point as well. >> >> MJS-7 and MJS-8: Great catches—I've updated the Compatibility and Testing >> sections accordingly. >> >> Best, >> Alieh >> >> On Tue, Jan 27, 2026 at 6:50 PM Matthias J. Sax <[email protected]> wrote: >> >>> Great discussion! Seems we are heading into the right direction. >>> >>> Thanks for clarifying the open question about the header serialization >>> format, VersionedRecordWithHeaders, StateSerdes, and upgrade path. >>> >>> >>> A few follow up questions: >>> >>> MJS-5: As we are keeping `headers_size` now, I am wondering if there >>> would be a benefit to change the byte format to the same order as used >>> in Kafka messages, ie >>> >>> [payload_size][payload][headers_byte] >>> >>> The only disadvantage I see would be, that I expect `header_size` to be >>> smaller than `payload_size` for most cases, so we might need a little >>> bit more space on average for the var-int encoding. But in both cases, >>> we would be able to implement lazy deserialization. Not saying we have >>> to do it this way -- in general I agree there is not much benefit to use >>> the same order as Kafka messages do as it was already pointed out. Just >>> wanted to mention it for completeness. Thoughts? >>> >>> >>> MJS-5-B: One request though: the KIP should explain why we need to add >>> `header_size` (or `payload_size` in case we really make this change). >>> Reading the KIP as-is, I would always ask myself why we would need >>> `header_size` -- so mentioning lazy deserialization explicitly as reason >>> why we add this field would be great to not puzzle readers about it. -- >>> The KIP mentions lazy-deserialization later in the "Compatibility" >>> section, but does not make the connection to `header_size` field >>> explicit in this section either. >>> >>> >>> MJS-6. For the upgrade path the KIP mentions >>> >>> > Window/Session: Employs a clean break at the segment level—old >>> segments stay as-is; new segments use the new format. >>> >>> I am wondering why we do it this way? Did KIP-258 also do this (I cannot >>> remember). It's an interesting idea. I am just wondering about pros/cons >>> compared to follow the same dual-cf-accessor path as we do for >>> non-windowed stores. Also from an implementation POV -- would it be more >>> or less code to write? >>> >>> >>> MJS-7. In the "Compatibility" section the KIP states >>> >>> > Backward Compatibility >>> > - Public API: No existing APIs are deprecated. The new header-aware >>> interfaces and factory methods are additive. >>> >>> As we deprecate some methods on `StateSerdes` now, this is not correct >>> any longer and should be updated. >>> >>> >>> MJS-8: Testing. -- There is no mentioning of system tests. And maybe we >>> don't need any. But might be good to be explicit. Did KIP-258 add new >>> system tests? >>> >>> >>> >>> @TengYao: Yes, your understanding of KS/Windowed vs Session store is >>> correct. It's really all about the optimization to avoid storing "event >>> time" for sessions twice, as we know "event time == window-end". That's >>> why using `ValueTimestampHeaders` for header-session store might not be >>> ideal, as we would lose this optimization. Introducing >>> `AggregationWithHeaders` is an attempt to keep this optimization though. >>> >>> >>> >>> >>> -Matthias >>> >>> >>> >>> >>> On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote: >>> > Updates to KIP >>> > >>> > - >>> > >>> > 1- A varint header_size field is introduced to enable lazy >>> deserialization >>> > when scanning large ranges. >>> > - >>> > >>> > 2- The current serialization/deserialization methods in StateSerdes are >>> > marked as deprecated to keep the class concise. >>> > - >>> > >>> > 3- Note that VersionedKeyValueStoreWithHeaders cannot extend >>> > VersionedKeyValueStore because their methods differ in input and/or >>> output >>> > types. In particular, the VersionedRecord returned by >>> VersionedKeyValueStore >>> > methods is a final class and therefore cannot be subclassed. >>> > >>> > Thanks, >>> > Alieh >>> > >>> > On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]> >>> wrote: >>> > >>> >> chia_03: Regarding the header size, using a Varint is consistent with >>> >> Kafka's serialization standards. It avoids the overhead of a large >>> >> fixed-size field while still achieving the efficient skipping >>> capability we >>> >> want. >>> >> >>> >> chia_04: That makes sense. >>> >> >>> >> Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四 下午10:59寫道: >>> >> >>> >>> Hi Chia-Ping Tsai, >>> >>> >>> >>> Thanks for the feedback. >>> >>> >>> >>> chia_03: The difficulty with adding a header length is deciding >>> between a >>> >>> fixed-size field for all records or a configuration allowing users to >>> >>> define a maximum. Alternatively, we could consider using a varint >>> for the >>> >>> header length to remain flexible and space-efficient. >>> >>> >>> >>> chia_04: >>> >>> It only makes sense to give the second column family its own RocksDB >>> >>> config if its access pattern or data characteristics are materially >>> >>> different. >>> >>> Here we have the same keys, the >>> >>> same or very similar read/write patterns (e.g., same get, put, range >>> >>> queries), >>> >>> and roughly comparable value sizes (CF2 slightly larger per entry). >>> >>> Then from RocksDB’s perspective the two CFs behave very similarly: >>> >>> both are generic key–value blobs, written and read with the same >>> >>> pattern. Most of the important RocksDB options (compaction style, >>> >>> write buffer sizes, block cache, bloom filters, etc.) would be tuned >>> >>> the same way for both. >>> >>> Do you see huge difference between these two? >>> >>> >>> >>> Thanks, >>> >>> Alieh >>> >>> >>> >>> On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai <[email protected]> >>> >>> wrote: >>> >>> >>> >>>> hi >>> >>>> >>> >>>> chia_03: should we provide a more effective way to load the value >>> >>> without >>> >>>> scanning the header bytes? (e.g., by storing the total size of >>> headers) >>> >>>> >>> >>>> chia_04: Do we need to allow separate Rocksdb configuration for the >>> new >>> >>>> column family >>> >>>> >>> >>>> Best, >>> >>>> Chia-Ping >>> >>>> >>> >>>> On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote: >>> >>>>> Hi all, >>> >>>>> >>> >>>>> I’d like to start a discussion on KIP-1271, which proposes allowing >>> >>> Kafka >>> >>>>> Streams state stores to preserve record headers. >>> >>>>> This would let header-based metadata like schema IDs, tracing info, >>> >>> and >>> >>>>> feature flags be stored and restored alongside values. >>> >>>>> The KIP introduces header-aware store types and a small config to >>> cap >>> >>> the >>> >>>>> size of headers written into state. >>> >>>>> Details are in the KIP: >>> >>>>> >>> >>>> >>> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores >>> >>>>> . >>> >>>>> I’d appreciate your feedback and questions on the proposal. >>> >>>>> >>> >>>>> Thanks, >>> >>>>> Alieh >>> >>>>> >>> >>>> >>> >>> >>> >> >>> > >>> >>>
