sanghyeok An created KAFKA-20311:
------------------------------------

             Summary:   Cached metered key-value stores can deserialize old 
values with the wrong headers when sendOldValues=true
                 Key: KAFKA-20311
                 URL: https://issues.apache.org/jira/browse/KAFKA-20311
             Project: Kafka
          Issue Type: Bug
            Reporter: sanghyeok An
            Assignee: sanghyeok An


 In the cached flush path, MeteredKeyValueStore deserializes both newValue and 
oldValue using the single headers set carried by the flush record.

 

 However, the flush record is built from:
 * newValue bytes from the cache
 * oldValue bytes read from the underlying store
 * one Record.headers() from the cached entry being flushed
 *  

This means oldValue bytes and newValue bytes are not guaranteed to have been 
serialized with the same headers.

 

When sendOldValues=true and a header-aware serde is used, oldValue may 
therefore be deserialized with headers that differ from the headers originally 
used to serialize it.

 

  As a result:
 * oldValue may be reconstructed incorrectly, or
 * deserialization may fail with an exception, depending on the serde 
implementation.

 

  {*}Affected Stores{*}{*}{*}
 * MeteredWindowStore
 * MeteredKeyValueStore
 * MeteredSessionStore

 

  {*}Why this happens{*}{*}{*}

  CachingKeyValueStore creates the flush record by:
 # taking newValue bytes from the cache
 # reading oldValue bytes from underlying store.
 # attaching entry.entry().context().headers() to the flush Record

 

 MeteredKeyValueStore.setFlushListener then deserializes both change.newValue 
and change.oldValue using *record.headers().*

 

  {*}Condition{*}{*}{*}

  This issue requires:
 * caching enabled
 * sendOldValues=true
 * a serde whose deserialize(topic, headers, bytes) behavior depends on headers

 

{*}Expected behavior{*}{*}{*}

The cached flush path should not deserialize oldValue with flush-record headers 
unless those headers are guaranteed to match the headers originally used to 
serialize oldValue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to