Hi all,
The `convertToHeaderFormat(final byte[] value)` has been corrected from the
previous, mistaken signature `convertToHeaderFormat(final byte[] key, final
byte[] value)`.

Bests,
Alieh

On Mon, Feb 2, 2026 at 7:34 PM Alieh Saeedi <[email protected]> wrote:

> Hi all
>
> Regarding SessionStoreWithHeaders, we reintroduced the
> AggregationWithHeaders data type so we no longer rely on
> ValueTimestampHeaders, thereby avoiding storage of unnecessary timestamps
> for sessions in session stores.
>
> Bests,
> Alieh
>
> On Thu, Jan 29, 2026 at 9:26 PM Alieh Saeedi <[email protected]> wrote:
>
>> Hey Matthias,
>>
>> Thanks for all the good points you raised.
>>
>> MJS-5: It’s good that we walked through all the possible options. It
>> initially seemed like a nice idea, but as you pointed out, there’s no real
>> benefit and we might even end up with longer values. In both formats we
>> still need to compute an offset by reading a varint to retrieve the value.
>> I also updated the KIP and added a few sentences to clarify why we store
>> headers_size as well.
>>
>> MJS-6: Yes, KIP‑258 already applied the same pattern for window/session
>> stores: KV uses dual column families with lazy per‑entry migration, while
>> window/session stores do a clean break at the segment level so old segments
>> stay in the legacy format and new ones use the new format. Segment‑level
>> versioning maps well to windows/sessions because they’re already
>> time‑segmented and constrained by retention, so we avoid dual‑CF complexity
>> in every small segment DB while still getting a natural rolling upgrade as
>> old segments age out. Compared with a dual‑CF “dual accessor” approach for
>> window/session, the clean break is less code, easier to reason about, and
>> reduces RocksDB overhead, with the trade‑off that legacy segments never get
>> backfilled with new metadata unless one explicitly rebuilds or migrates the
>> state. I added some clarification to the KIP on that point as well.
>>
>> MJS-7 and MJS-8: Great catches—I've updated the Compatibility and Testing
>> sections accordingly.
>>
>> Best,
>> Alieh
>>
>> On Tue, Jan 27, 2026 at 6:50 PM Matthias J. Sax <[email protected]> wrote:
>>
>>> Great discussion! Seems we are heading into the right direction.
>>>
>>> Thanks for clarifying the open question about the header serialization
>>> format, VersionedRecordWithHeaders, StateSerdes, and upgrade path.
>>>
>>>
>>> A few follow up questions:
>>>
>>> MJS-5: As we are keeping `headers_size` now, I am wondering if there
>>> would be a benefit to change the byte format to the same order as used
>>> in Kafka messages, ie
>>>
>>>    [payload_size][payload][headers_byte]
>>>
>>> The only disadvantage I see would be, that I expect `header_size` to be
>>> smaller than `payload_size` for most cases, so we might need a little
>>> bit more space on average for the var-int encoding. But in both cases,
>>> we would be able to implement lazy deserialization. Not saying we have
>>> to do it this way -- in general I agree there is not much benefit to use
>>> the same order as Kafka messages do as it was already pointed out. Just
>>> wanted to mention it for completeness. Thoughts?
>>>
>>>
>>> MJS-5-B: One request though: the KIP should explain why we need to add
>>> `header_size` (or `payload_size` in case we really make this change).
>>> Reading the KIP as-is, I would always ask myself why we would need
>>> `header_size` -- so mentioning lazy deserialization explicitly as reason
>>> why we add this field would be great to not puzzle readers about it. --
>>> The KIP mentions lazy-deserialization later in the "Compatibility"
>>> section, but does not make the connection to `header_size` field
>>> explicit in this section either.
>>>
>>>
>>> MJS-6. For the upgrade path the KIP mentions
>>>
>>> > Window/Session: Employs a clean break at the segment level—old
>>> segments stay as-is; new segments use the new format.
>>>
>>> I am wondering why we do it this way? Did KIP-258 also do this (I cannot
>>> remember). It's an interesting idea. I am just wondering about pros/cons
>>> compared to follow the same dual-cf-accessor path as we do for
>>> non-windowed stores. Also from an implementation POV -- would it be more
>>> or less code to write?
>>>
>>>
>>> MJS-7. In the "Compatibility" section the KIP states
>>>
>>> > Backward Compatibility
>>> > - Public API: No existing APIs are deprecated. The new header-aware
>>> interfaces and factory methods are additive.
>>>
>>> As we deprecate some methods on `StateSerdes` now, this is not correct
>>> any longer and should be updated.
>>>
>>>
>>> MJS-8: Testing. -- There is no mentioning of system tests. And maybe we
>>> don't need any. But might be good to be explicit. Did KIP-258 add new
>>> system tests?
>>>
>>>
>>>
>>> @TengYao: Yes, your understanding of KS/Windowed vs Session store is
>>> correct. It's really all about the optimization to avoid storing "event
>>> time" for sessions twice, as we know "event time == window-end". That's
>>> why using `ValueTimestampHeaders` for header-session store might not be
>>> ideal, as we would lose this optimization. Introducing
>>> `AggregationWithHeaders` is an attempt to keep this optimization though.
>>>
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>>
>>> On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote:
>>> > Updates to KIP
>>> >
>>> > -
>>> >
>>> > 1- A varint header_size field is introduced to enable lazy
>>> deserialization
>>> > when scanning large ranges.
>>> > -
>>> >
>>> > 2- The current serialization/deserialization methods in StateSerdes are
>>> > marked as deprecated to keep the class concise.
>>> > -
>>> >
>>> > 3- Note that VersionedKeyValueStoreWithHeaders cannot extend
>>> > VersionedKeyValueStore because their methods differ in input and/or
>>> output
>>> > types. In particular, the VersionedRecord returned by
>>> VersionedKeyValueStore
>>> > methods is a final class and therefore cannot be subclassed.
>>> >
>>> > Thanks,
>>> > Alieh
>>> >
>>> > On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]>
>>> wrote:
>>> >
>>> >> chia_03: Regarding the header size, using a Varint is consistent with
>>> >> Kafka's serialization standards. It avoids the overhead of a large
>>> >> fixed-size field while still achieving the efficient skipping
>>> capability we
>>> >> want.
>>> >>
>>> >> chia_04: That makes sense.
>>> >>
>>> >> Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四 下午10:59寫道:
>>> >>
>>> >>> Hi Chia-Ping Tsai,
>>> >>>
>>> >>> Thanks for the feedback.
>>> >>>
>>> >>> chia_03: The difficulty with adding a header length is deciding
>>> between a
>>> >>> fixed-size field for all records or a configuration allowing users to
>>> >>> define a maximum. Alternatively, we could consider using a varint
>>> for the
>>> >>> header length to remain flexible and space-efficient.
>>> >>>
>>> >>> chia_04:
>>> >>> It only makes sense to give the second column family its own RocksDB
>>> >>> config if its access pattern or data characteristics are materially
>>> >>> different.
>>> >>> Here we have the same keys, the
>>> >>> same or very similar read/write patterns (e.g., same get, put, range
>>> >>> queries),
>>> >>> and roughly comparable value sizes (CF2 slightly larger per entry).
>>> >>> Then from RocksDB’s perspective the two CFs behave very similarly:
>>> >>> both are generic key–value blobs, written and read with the same
>>> >>> pattern. Most of the important RocksDB options (compaction style,
>>> >>> write buffer sizes, block cache, bloom filters, etc.) would be tuned
>>> >>> the same way for both.
>>> >>> Do you see huge difference between these two?
>>> >>>
>>> >>> Thanks,
>>> >>> Alieh
>>> >>>
>>> >>> On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai <[email protected]>
>>> >>> wrote:
>>> >>>
>>> >>>> hi
>>> >>>>
>>> >>>> chia_03: should we provide a more effective way to load the value
>>> >>> without
>>> >>>> scanning the header bytes? (e.g., by storing the total size of
>>> headers)
>>> >>>>
>>> >>>> chia_04: Do we need to allow separate Rocksdb configuration for the
>>> new
>>> >>>> column family
>>> >>>>
>>> >>>> Best,
>>> >>>> Chia-Ping
>>> >>>>
>>> >>>> On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote:
>>> >>>>> Hi all,
>>> >>>>>
>>> >>>>> I’d like to start a discussion on KIP-1271, which proposes allowing
>>> >>> Kafka
>>> >>>>> Streams state stores to preserve record headers.
>>> >>>>> This would let header-based metadata like schema IDs, tracing info,
>>> >>> and
>>> >>>>> feature flags be stored and restored alongside values.
>>> >>>>> The KIP introduces header-aware store types and a small config to
>>> cap
>>> >>> the
>>> >>>>> size of headers written into state.
>>> >>>>> Details are in the KIP:
>>> >>>>>
>>> >>>>
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
>>> >>>>> .
>>> >>>>> I’d appreciate your feedback and questions on the proposal.
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Alieh
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>>

Reply via email to