[ 
https://issues.apache.org/jira/browse/KAFKA-20311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-20311.
-------------------------------------
      Assignee:     (was: sanghyeok An)
    Resolution: Not A Problem

>   Cached metered key-value stores can deserialize old values with the wrong 
> headers when sendOldValues=true
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20311
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20311
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: sanghyeok An
>            Priority: Minor
>              Labels: kip-1271, streams
>
>  In the cached flush path, MeteredKeyValueStore deserializes both newValue 
> and oldValue using the single headers set carried by the flush record.
>  
>  However, the flush record is built from:
>  * newValue bytes from the cache
>  * oldValue bytes read from the underlying store
>  * one Record.headers() from the cached entry being flushed
> This means oldValue bytes and newValue bytes are not guaranteed to have been 
> serialized with the same headers.
> When *sendOldValues=true* and a *header-aware serde* is used, *oldValue may 
> therefore be deserialized with headers that differ from the headers 
> originally* used to serialize it.
> As a result:
>  * oldValue may be reconstructed incorrectly, or
>  * deserialization may fail with an exception, depending on the serde 
> implementation.
>  
> h2.   *Affected Stores*
>  * MeteredWindowStore
>  * MeteredKeyValueStore
>  * MeteredSessionStore
>  
> h2. *Why this happens*
>   CachingKeyValueStore creates the flush record by:
>  # taking newValue bytes from the cache
>  # reading oldValue bytes from underlying store.
>  # attaching entry.entry().context().headers() to the flush Record
> MeteredKeyValueStore.setFlushListener then deserializes both change.newValue 
> and change.oldValue using *record.headers().*
>  
> h2. Condition
>   This issue requires:
>  * caching enabled
>  * sendOldValues=true
>  * a serde whose deserialize(topic, headers, bytes) behavior depends on 
> headers
>  
> h2. *Expected behavior*
> The cached flush path should not deserialize oldValue with flush-record 
> headers unless those headers are guaranteed to match the headers originally 
> used to serialize oldValue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to