This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5fad14e74e8 MINOR: use correct deserialize method in
ValueTimestampHeaders (#21449)
5fad14e74e8 is described below
commit 5fad14e74e827a221ad9a3570e79a5b2cf3889da
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 11 08:34:21 2026 +0000
MINOR: use correct deserialize method in ValueTimestampHeaders (#21449)
When (de)serializing `ValueTimestampHeader` object, we need to use
`deserialize(String, Headers, byte[])` instead of `deserialize(String,
byte[])`.
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../ValueTimestampHeadersDeserializer.java | 2 +-
.../ValueTimestampHeadersDeserializerTest.java | 29 ++++++++++++++++++++++
.../ValueTimestampHeadersSerializerTest.java | 28 +++++++++++++++++++++
3 files changed, 58 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
index bbd91ae247b..b67136ee41d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -83,7 +83,7 @@ class ValueTimestampHeadersDeserializer<V> implements
WrappingNullableDeserializ
final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
final long timestamp = timestampDeserializer.deserialize(topic,
rawTimestamp);
final byte[] rawValue = readBytes(buffer, buffer.remaining());
- final V value = valueDeserializer.deserialize(topic, rawValue);
+ final V value = valueDeserializer.deserialize(topic, headers,
rawValue);
return ValueTimestampHeaders.make(value, timestamp, headers);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
index 806755b213c..8a4471580c6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -36,6 +37,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ValueTimestampHeadersDeserializerTest {
@@ -275,4 +283,25 @@ public class ValueTimestampHeadersDeserializerTest {
"Should throw SerializationException when buffer doesn't have
enough data"
);
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldPassHeadersToDeserializer() {
+ final Deserializer<String> mockDeserializer = mock(Deserializer.class);
+ when(mockDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class)))
+ .thenReturn("test-value");
+
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final ValueTimestampHeadersDeserializer<String> testDeserializer =
+ new ValueTimestampHeadersDeserializer<>(mockDeserializer);
+ testDeserializer.deserialize(TOPIC, serialized);
+
+ // we should invoke the deserialize(String, Headers, byte) instead of
deserialize(String, byte)
+ verify(mockDeserializer).deserialize(eq(TOPIC), any(Headers.class),
any(byte[].class));
+ verify(mockDeserializer, never()).deserialize(eq(TOPIC),
any(byte[].class));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
index 7326696cc12..32a59e1dc6f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.junit.jupiter.api.AfterEach;
@@ -29,6 +30,13 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ValueTimestampHeadersSerializerTest {
@@ -124,4 +132,24 @@ public class ValueTimestampHeadersSerializerTest {
final byte[] serialized = serializer.serialize(TOPIC,
valueTimestampHeaders);
assertNull(serialized);
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldPassHeadersToSerializer() {
+ final Serializer<String> mockSerializer = mock(Serializer.class);
+ when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString()))
+ .thenReturn("test-value".getBytes());
+
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
+ ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+ final ValueTimestampHeadersSerializer<String> testSerializer =
+ new ValueTimestampHeadersSerializer<>(mockSerializer);
+ testSerializer.serialize(TOPIC, valueTimestampHeaders);
+
+ // we should invoke the serialize(String, Headers, Object) instead of
serialize(String, Object)
+ verify(mockSerializer).serialize(eq(TOPIC), any(Headers.class),
eq(VALUE));
+ verify(mockSerializer, never()).serialize(eq(TOPIC), eq(VALUE));
+ }
}