[
https://issues.apache.org/jira/browse/KAFKA-20311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-20311.
-------------------------------------
Assignee: (was: sanghyeok An)
Resolution: Not A Problem
> Cached metered key-value stores can deserialize old values with the wrong
> headers when sendOldValues=true
> -----------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-20311
> URL: https://issues.apache.org/jira/browse/KAFKA-20311
> Project: Kafka
> Issue Type: Sub-task
> Reporter: sanghyeok An
> Priority: Minor
> Labels: kip-1271, streams
>
> In the cached flush path, MeteredKeyValueStore deserializes both newValue
> and oldValue using the single headers set carried by the flush record.
>
> However, the flush record is built from:
> * newValue bytes from the cache
> * oldValue bytes read from the underlying store
> * one Record.headers() from the cached entry being flushed
> This means oldValue bytes and newValue bytes are not guaranteed to have been
> serialized with the same headers.
> When *sendOldValues=true* and a *header-aware serde* is used, *oldValue may
> therefore be deserialized with headers that differ from the headers
> originally* used to serialize it.
> As a result:
> * oldValue may be reconstructed incorrectly, or
> * deserialization may fail with an exception, depending on the serde
> implementation.
>
> h2. *Affected Stores*
> * MeteredWindowStore
> * MeteredKeyValueStore
> * MeteredSessionStore
>
> h2. *Why this happens*
> CachingKeyValueStore creates the flush record by:
> # taking newValue bytes from the cache
> # reading oldValue bytes from underlying store.
> # attaching entry.entry().context().headers() to the flush Record
> MeteredKeyValueStore.setFlushListener then deserializes both change.newValue
> and change.oldValue using *record.headers().*
>
> h2. Condition
> This issue requires:
> * caching enabled
> * sendOldValues=true
> * a serde whose deserialize(topic, headers, bytes) behavior depends on
> headers
>
> h2. *Expected behavior*
> The cached flush path should not deserialize oldValue with flush-record
> headers unless those headers are guaranteed to match the headers originally
> used to serialize oldValue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)