This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5fad14e74e8 MINOR: use correct deserialize method in 
ValueTimestampHeaders (#21449)
5fad14e74e8 is described below

commit 5fad14e74e827a221ad9a3570e79a5b2cf3889da
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 11 08:34:21 2026 +0000

    MINOR: use correct deserialize method in ValueTimestampHeaders (#21449)
    
    When (de)serializing `ValueTimestampHeader` object, we need to use
    `deserialize(String, Headers, byte[])` instead of `deserialize(String,
    byte[])`.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../ValueTimestampHeadersDeserializer.java         |  2 +-
 .../ValueTimestampHeadersDeserializerTest.java     | 29 ++++++++++++++++++++++
 .../ValueTimestampHeadersSerializerTest.java       | 28 +++++++++++++++++++++
 3 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
index bbd91ae247b..b67136ee41d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -83,7 +83,7 @@ class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializ
         final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
         final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp);
         final byte[] rawValue = readBytes(buffer, buffer.remaining());
-        final V value = valueDeserializer.deserialize(topic, rawValue);
+        final V value = valueDeserializer.deserialize(topic, headers, 
rawValue);
 
         return ValueTimestampHeaders.make(value, timestamp, headers);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
index 806755b213c..8a4471580c6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -36,6 +37,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ValueTimestampHeadersDeserializerTest {
 
@@ -275,4 +283,25 @@ public class ValueTimestampHeadersDeserializerTest {
             "Should throw SerializationException when buffer doesn't have 
enough data"
         );
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldPassHeadersToDeserializer() {
+        final Deserializer<String> mockDeserializer = mock(Deserializer.class);
+        when(mockDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class)))
+            .thenReturn("test-value");
+
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final ValueTimestampHeadersDeserializer<String> testDeserializer =
+            new ValueTimestampHeadersDeserializer<>(mockDeserializer);
+        testDeserializer.deserialize(TOPIC, serialized);
+
+        // we should invoke the deserialize(String, Headers, byte) instead of 
deserialize(String, byte)
+        verify(mockDeserializer).deserialize(eq(TOPIC), any(Headers.class), 
any(byte[].class));
+        verify(mockDeserializer, never()).deserialize(eq(TOPIC), 
any(byte[].class));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
index 7326696cc12..32a59e1dc6f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 
 import org.junit.jupiter.api.AfterEach;
@@ -29,6 +30,13 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ValueTimestampHeadersSerializerTest {
 
@@ -124,4 +132,24 @@ public class ValueTimestampHeadersSerializerTest {
         final byte[] serialized = serializer.serialize(TOPIC, 
valueTimestampHeaders);
         assertNull(serialized);
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldPassHeadersToSerializer() {
+        final Serializer<String> mockSerializer = mock(Serializer.class);
+        when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString()))
+            .thenReturn("test-value".getBytes());
+
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders =
+            ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+        final ValueTimestampHeadersSerializer<String> testSerializer =
+            new ValueTimestampHeadersSerializer<>(mockSerializer);
+        testSerializer.serialize(TOPIC, valueTimestampHeaders);
+
+        // we should invoke the serialize(String, Headers, Object) instead of 
serialize(String, Object)
+        verify(mockSerializer).serialize(eq(TOPIC), any(Headers.class), 
eq(VALUE));
+        verify(mockSerializer, never()).serialize(eq(TOPIC), eq(VALUE));
+    }
 }

Reply via email to