This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45b5df2bc7d KAFKA-20121: Create ValueTimestampHeaders and its 
serializer/deserializer (#21408)
45b5df2bc7d is described below

commit 45b5df2bc7d0d1ed2e40b3c9e0aff2edf8fdda94
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Feb 10 18:46:45 2026 +0000

    KAFKA-20121: Create ValueTimestampHeaders and its serializer/deserializer 
(#21408)
    
    This PR adds the `ValueTimestampHeaders` and its related
    serializer/deserializer as infrastructure of KIP-1271.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../kafka/streams/state/ValueTimestampHeaders.java | 127 ++++++++++
 .../state/internals/HeadersDeserializer.java       |   6 +
 .../streams/state/internals/HeadersSerializer.java |   3 +-
 .../ValueTimestampHeadersDeserializer.java         | 167 +++++++++++++
 .../internals/ValueTimestampHeadersSerde.java      |  40 +++
 .../internals/ValueTimestampHeadersSerializer.java | 128 ++++++++++
 .../streams/state/ValueTimestampHeadersTest.java   | 158 ++++++++++++
 .../state/internals/HeadersDeserializerTest.java   |  34 +++
 .../state/internals/HeadersSerializerTest.java     |  25 --
 .../ValueTimestampHeadersDeserializerTest.java     | 278 +++++++++++++++++++++
 .../ValueTimestampHeadersSerializerTest.java       | 127 ++++++++++
 11 files changed, 1067 insertions(+), 26 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java
new file mode 100644
index 00000000000..6a924134867
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.util.Objects;
+
+/**
+ * Combines a value with its timestamp and associated record headers.
+ *
+ * @param <V> the value type
+ */
+public final class ValueTimestampHeaders<V> {
+
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    private ValueTimestampHeaders(final V value, final long timestamp, final 
Headers headers) {
+        this.value = value;
+        this.timestamp = timestamp;
+        this.headers = headers == null ? new RecordHeaders() : headers;
+    }
+
+    /**
+     * Create a new {@link ValueTimestampHeaders} instance if the provided 
{@code value} is not {@code null}.
+     *
+     * @param value     the value
+     * @param timestamp the timestamp
+     * @param headers   the headers (may be {@code null}, treated as empty)
+     * @param <V>       the type of the value
+     * @return a new {@link ValueTimestampHeaders} instance if the provided 
{@code value} is not {@code null};
+     * otherwise {@code null} is returned
+     */
+    public static <V> ValueTimestampHeaders<V> make(final V value,
+                                                    final long timestamp,
+                                                    final Headers headers) {
+        if (value == null) {
+            return null;
+        }
+        return new ValueTimestampHeaders<>(value, timestamp, headers);
+    }
+
+    /**
+     * Create a new {@link ValueTimestampHeaders} instance.
+     * The provided {@code value} may be {@code null}.
+     *
+     * @param value     the value (may be {@code null})
+     * @param timestamp the timestamp
+     * @param headers   the headers (may be {@code null}, treated as empty)
+     * @param <V>       the type of the value
+     * @return a new {@link ValueTimestampHeaders} instance
+     */
+    public static <V> ValueTimestampHeaders<V> makeAllowNullable(final V value,
+                                                                 final long 
timestamp,
+                                                                 final Headers 
headers) {
+        return new ValueTimestampHeaders<>(value, timestamp, headers);
+    }
+
+    /**
+     * Return the wrapped {@code value} of the given {@code 
valueTimestampHeaders} parameter
+     * if the parameter is not {@code null}.
+     *
+     * @param valueTimestampHeaders a {@link ValueTimestampHeaders} instance; 
can be {@code null}
+     * @param <V>                   the type of the value
+     * @return the wrapped {@code value} of {@code valueTimestampHeaders} if 
not {@code null}; otherwise {@code null}
+     */
+    public static <V> V getValueOrNull(final ValueTimestampHeaders<V> 
valueTimestampHeaders) {
+        return valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value;
+    }
+
+    public V value() {
+        return value;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ValueTimestampHeaders)) {
+            return false;
+        }
+        final ValueTimestampHeaders<?> that = (ValueTimestampHeaders<?>) o;
+        return timestamp == that.timestamp
+            && Objects.equals(value, that.value)
+            && Objects.equals(this.headers, that.headers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(value, timestamp, headers);
+    }
+
+    @Override
+    public String toString() {
+        return "ValueTimestampHeaders{" +
+            "value=" + value +
+            ", timestamp=" + timestamp +
+            ", headers=" + headers +
+            '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
index 70a1107d981..88ca8848bdc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
@@ -86,4 +86,10 @@ public class HeadersDeserializer implements 
Deserializer<Headers> {
 
         return headers;
     }
+
+    public static Headers deserialize(final byte[] data) {
+        try (HeadersDeserializer deserializer = new HeadersDeserializer()) {
+            return deserializer.deserialize("", data);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index 91a97e78e4b..66f2c4e0da9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
@@ -93,7 +94,7 @@ public class HeadersSerializer implements Serializer<Headers> 
{
 
             return baos.toByteArray();
         } catch (IOException e) {
-            throw new RuntimeException("Failed to serialize headers", e);
+            throw new SerializationException("Failed to serialize headers", e);
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
new file mode 100644
index 00000000000..bbd91ae247b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for ValueTimestampHeaders.
+ *
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ *   - For null/empty headers: headersSize = 0, headersBytes is omitted (0 
bytes)
+ *   - For non-empty headers: headersSize > 0, serialized headers in the 
format [count(varint)][header1][header2]... to be processed by 
HeadersDeserializer.
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value to be deserialized with the provided value 
deserializer
+ *
+ * This is used by KIP-1271 to deserialize values with timestamps and headers 
from state stores.
+ */
+class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+    private static final HeadersDeserializer HEADERS_DESERIALIZER = new 
HeadersDeserializer();
+
+    public final Deserializer<V> valueDeserializer;
+    private final LongDeserializer timestampDeserializer;
+    private final HeadersDeserializer headersDeserializer;
+
+    ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) 
{
+        Objects.requireNonNull(valueDeserializer);
+        this.valueDeserializer = valueDeserializer;
+        this.timestampDeserializer = new LongDeserializer();
+        this.headersDeserializer = new HeadersDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        headersDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> deserialize(final String topic, final 
byte[] valueTimestampHeaders) {
+        if (valueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(valueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+
+        final byte[] rawHeaders = readBytes(buffer, headersSize);
+        final Headers headers = headersDeserializer.deserialize(topic, 
rawHeaders);
+        final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
+        final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp);
+        final byte[] rawValue = readBytes(buffer, buffer.remaining());
+        final V value = valueDeserializer.deserialize(topic, rawValue);
+
+        return ValueTimestampHeaders.make(value, timestamp, headers);
+    }
+
+    @Override
+    public void close() {
+        valueDeserializer.close();
+        timestampDeserializer.close();
+        headersDeserializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // ValueTimestampHeadersDeserializer never wraps a null deserializer 
(or configure would throw),
+        // but it may wrap a deserializer that itself wraps a null 
deserializer.
+        initNullableDeserializer(valueDeserializer, getter);
+    }
+
+    /**
+     * Reads the specified number of bytes from the buffer with validation.
+     *
+     * @param buffer the ByteBuffer to read from
+     * @param length the number of bytes to read
+     * @return the byte array containing the read bytes
+     * @throws SerializationException if buffer doesn't have enough bytes
+     */
+    private static byte[] readBytes(final ByteBuffer buffer, final int length) 
{
+        if (buffer.remaining() < length) {
+            throw new SerializationException(
+                "Invalid ValueTimestampHeaders format: expected " + length +
+                " bytes but only " + buffer.remaining() + " bytes remaining"
+            );
+        }
+        final byte[] bytes = new byte[length];
+        buffer.get(bytes);
+        return bytes;
+    }
+
+    /**
+     * Extract value from serialized ValueTimestampHeaders.
+     */
+    static <T> T value(final byte[] rawValueTimestampHeaders, final 
Deserializer<T> deserializer) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // skip headers plus timestamp
+        buffer.position(buffer.position() + headersSize + Long.BYTES);
+        byte[] bytes = readBytes(buffer, buffer.remaining());
+
+        return deserializer.deserialize("", bytes);
+    }
+
+    /**
+     * Extract timestamp from serialized ValueTimestampHeaders.
+     */
+    static long timestamp(final byte[] rawValueTimestampHeaders) {
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
+        return LONG_DESERIALIZER.deserialize("", rawTimestamp);
+    }
+
+    /**
+     * Extract headers from serialized ValueTimestampHeaders.
+     */
+    static Headers headers(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        final byte[] rawHeaders = readBytes(buffer, headersSize);
+        return HEADERS_DESERIALIZER.deserialize("", rawHeaders);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerde.java
new file mode 100644
index 00000000000..90901a6d9e0
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerde.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Serde for ValueTimestampHeaders.
+ *
+ * This serde wraps a value serde and handles serialization/deserialization of
+ * values along with their timestamps and headers.
+ *
+ * This is used by KIP-1271 to support headers in state stores.
+ */
+public class ValueTimestampHeadersSerde<V> extends 
WrappingNullableSerde<ValueTimestampHeaders<V>, Void, V> {
+    public ValueTimestampHeadersSerde(final Serde<V> valueSerde) {
+        super(
+            new ValueTimestampHeadersSerializer<>(requireNonNull(valueSerde, 
"valueSerde was null").serializer()),
+            new ValueTimestampHeadersDeserializer<>(requireNonNull(valueSerde, 
"valueSerde was null").deserializer())
+        );
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
new file mode 100644
index 00000000000..390458525e1
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ *   - For null/empty headers: headersSize = 0, headersBytes is omitted (0 
bytes)
+ *   - For non-empty headers: headersSize > 0, serialized headers 
([count(varint)][header1][header2]...) from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final LongSerializer timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        headersSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueTimestampHeaders<V> 
valueTimestampHeaders) {
+        if (valueTimestampHeaders == null) {
+            return null;
+        }
+        return serialize(topic, valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp(), valueTimestampHeaders.headers());
+    }
+
+    private byte[] serialize(final String topic, final V plainValue, final 
long timestamp, final Headers headers) {
+        if (plainValue == null) {
+            return null;
+        }
+
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
plainValue);
+
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        // See the discussion here: https://github.com/apache/kafka/pull/7679
+        if (rawValue == null) {
+            return null;
+        }
+
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+
+        // empty (byte[0]) for null/empty headers, or 
[count][header1][header2]... for non-empty
+        final byte[] rawHeaders = headersSerializer.serialize(topic, headers);
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);  // headersSize (it 
may be 0 due to null/empty headers)
+            out.write(rawHeaders);                          // empty (byte[0]) 
for null/empty headers, or [count][header1][header2]... for non-empty
+            out.write(rawTimestamp);                        // [timestamp(8)]
+            out.write(rawValue);                            // [value]
+
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new SerializationException("Failed to serialize 
ValueTimestampHeaders", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        valueSerializer.close();
+        timestampSerializer.close();
+        headersSerializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // ValueTimestampHeadersSerializer never wraps a null serializer (or 
configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, getter);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/ValueTimestampHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/ValueTimestampHeadersTest.java
new file mode 100644
index 00000000000..277f6bcbfa3
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/ValueTimestampHeadersTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ValueTimestampHeadersTest {
+
+    private static final String VALUE = "test-value";
+    private static final long TIMESTAMP = 123456789L;
+
+    @Test
+    public void shouldCreateInstanceWithMake() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+        assertNotNull(valueTimestampHeaders);
+        assertEquals(VALUE, valueTimestampHeaders.value());
+        assertEquals(TIMESTAMP, valueTimestampHeaders.timestamp());
+        assertEquals(headers, valueTimestampHeaders.headers());
+    }
+
+    @Test
+    public void shouldReturnNullWhenValueIsNullWithMake() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.make(null, TIMESTAMP, headers);
+
+        assertNull(valueTimestampHeaders);
+    }
+
+    @Test
+    public void shouldCreateInstanceWithMakeAllowNullable() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.makeAllowNullable(VALUE, TIMESTAMP, headers);
+
+        assertNotNull(valueTimestampHeaders);
+        assertEquals(VALUE, valueTimestampHeaders.value());
+        assertEquals(TIMESTAMP, valueTimestampHeaders.timestamp());
+        assertEquals(headers, valueTimestampHeaders.headers());
+    }
+
+    @Test
+    public void shouldCreateInstanceWithNullValueUsingMakeAllowNullable() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.makeAllowNullable(null, TIMESTAMP, headers);
+
+        assertNotNull(valueTimestampHeaders);
+        assertNull(valueTimestampHeaders.value());
+        assertEquals(TIMESTAMP, valueTimestampHeaders.timestamp());
+        assertEquals(headers, valueTimestampHeaders.headers());
+    }
+
+    @Test
+    public void shouldCreateEmptyHeadersWhenHeadersAreNull() {
+        final ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, null);
+
+        assertNotNull(valueTimestampHeaders);
+        assertNotNull(valueTimestampHeaders.headers());
+        assertEquals(0, valueTimestampHeaders.headers().toArray().length);
+    }
+
+    @Test
+    public void shouldGetValueOrNull() {
+        final Headers headers = new RecordHeaders();
+        ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+        assertEquals(VALUE, 
ValueTimestampHeaders.getValueOrNull(valueTimestampHeaders));
+
+        valueTimestampHeaders = ValueTimestampHeaders.makeAllowNullable(null, 
TIMESTAMP, headers);
+        
assertNull(ValueTimestampHeaders.getValueOrNull(valueTimestampHeaders));
+
+        assertNull(ValueTimestampHeaders.getValueOrNull(null));
+    }
+
+    @Test
+    public void shouldBeEqualWhenSameValues() {
+        final Headers headers1 = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final Headers headers2 = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final ValueTimestampHeaders<String> valueTimestampHeaders1 = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers1);
+        final ValueTimestampHeaders<String> valueTimestampHeaders2 = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers2);
+
+        assertEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+        assertEquals(valueTimestampHeaders1.hashCode(), 
valueTimestampHeaders2.hashCode());
+    }
+
+    @Test
+    public void shouldNotBeEqualWhenDifferentValues() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final ValueTimestampHeaders<String> valueTimestampHeaders1 = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+        final ValueTimestampHeaders<String> valueTimestampHeaders2 = 
ValueTimestampHeaders.make("different", TIMESTAMP, headers);
+
+        assertNotEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+    }
+
+    @Test
+    public void shouldNotBeEqualWhenDifferentTimestamps() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final ValueTimestampHeaders<String> valueTimestampHeaders1 = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+        final ValueTimestampHeaders<String> valueTimestampHeaders2 = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP + 1, headers);
+
+        assertNotEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+    }
+
+    @Test
+    public void shouldNotBeEqualWhenDifferentHeaders() {
+        final Headers headers1 = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final Headers headers2 = new RecordHeaders().add("key2", 
"value2".getBytes());
+
+        final ValueTimestampHeaders<String> valueTimestampHeaders1 = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers1);
+        final ValueTimestampHeaders<String> valueTimestampHeaders2 = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers2);
+
+        assertNotEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+    }
+
+    @Test
+    public void shouldBeEqualToItself() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+        assertEquals(valueTimestampHeaders, valueTimestampHeaders);
+    }
+
+    @Test
+    public void shouldHaveCorrectToString() {
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders = 
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+        final String toString = valueTimestampHeaders.toString();
+        assertNotNull(toString);
+        assertTrue(toString.contains("value=" + VALUE));
+        assertTrue(toString.contains("timestamp=" + TIMESTAMP));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
index 2a36d34c564..7737c60290a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Iterator;
+
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -124,4 +126,36 @@ public class HeadersDeserializerTest {
         assertEquals("key1", header.key());
         assertArrayEquals(new byte[0], header.value());
     }
+
+    @Test
+    public void shouldAllowDuplicateKeys() {
+        final Headers original = new RecordHeaders()
+            .add("key0", "value0".getBytes())
+            .add("key0", "value0".getBytes())
+            .add("key1", "value1".getBytes())
+            .add("key2", "value2".getBytes())
+            .add("key2", "value3".getBytes());
+        final byte[] serialized = serializer.serialize("", original);
+        final Headers deserialized = deserializer.deserialize("", serialized);
+        assertNotNull(deserialized);
+
+        final Header[] headerArray = deserialized.toArray();
+        assertEquals(5, headerArray.length);
+        final Iterator<Header> iterator = deserialized.iterator();
+        Header next = iterator.next();
+        assertEquals("key0", next.key());
+        assertArrayEquals("value0".getBytes(), next.value());
+        next = iterator.next();
+        assertEquals("key0", next.key());
+        assertArrayEquals("value0".getBytes(), next.value());
+        next = iterator.next();
+        assertEquals("key1", next.key());
+        assertArrayEquals("value1".getBytes(), next.value());
+        next = iterator.next();
+        assertEquals("key2", next.key());
+        assertArrayEquals("value2".getBytes(), next.value());
+        next = iterator.next();
+        assertEquals("key2", next.key());
+        assertArrayEquals("value3".getBytes(), next.value());
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index dc0afdec7b0..bb2ab983475 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -129,29 +129,4 @@ public class HeadersSerializerTest {
         assertEquals("key1", header.key());
         assertArrayEquals(new byte[0], header.value());
     }
-
-    @Test
-    public void shouldSerializeHeadersWithSpecialCharacters() {
-        final Headers headers = new RecordHeaders()
-            .add("key-with-dash", "value".getBytes())
-            .add("key.with.dots", "value".getBytes())
-            .add("key_with_underscores", "value".getBytes());
-        final byte[] serialized = serializer.serialize("", headers);
-
-        assertNotNull(serialized);
-        assertTrue(serialized.length > 0);
-
-        final Headers deserialized = deserializer.deserialize("", serialized);
-        assertNotNull(deserialized);
-        assertEquals(3, deserialized.toArray().length);
-
-        assertNotNull(deserialized.lastHeader("key-with-dash"));
-        assertArrayEquals("value".getBytes(), 
deserialized.lastHeader("key-with-dash").value());
-
-        assertNotNull(deserialized.lastHeader("key.with.dots"));
-        assertArrayEquals("value".getBytes(), 
deserialized.lastHeader("key.with.dots").value());
-
-        assertNotNull(deserialized.lastHeader("key_with_underscores"));
-        assertArrayEquals("value".getBytes(), 
deserialized.lastHeader("key_with_underscores").value());
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
new file mode 100644
index 00000000000..806755b213c
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ValueTimestampHeadersDeserializerTest {
+
+    private static final String TOPIC = "test-topic";
+    private ValueTimestampHeadersSerializer<String> serializer;
+    private ValueTimestampHeadersDeserializer<String> deserializer;
+
+    @BeforeEach
+    void setup() {
+        serializer = new 
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+        deserializer = new 
ValueTimestampHeadersDeserializer<>(Serdes.String().deserializer());
+    }
+
+    @AfterEach
+    void cleanup() {
+        if (serializer != null) {
+            serializer.close();
+        }
+        if (deserializer != null) {
+            deserializer.close();
+        }
+    }
+
+    @Test
+    public void shouldDeserializeNullToNull() {
+        final ValueTimestampHeaders<String> result = 
deserializer.deserialize(TOPIC, null);
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldDeserializeWithEmptyHeaders() {
+        final Headers headers = new RecordHeaders();
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final ValueTimestampHeaders<String> deserialized = 
deserializer.deserialize(TOPIC, serialized);
+
+        assertNotNull(deserialized);
+        assertEquals("test-value", deserialized.value());
+        assertEquals(123456789L, deserialized.timestamp());
+        assertNotNull(deserialized.headers());
+        assertEquals(0, deserialized.headers().toArray().length);
+    }
+
+    @Test
+    public void shouldDeserializeWithSingleHeader() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final ValueTimestampHeaders<String> deserialized = 
deserializer.deserialize(TOPIC, serialized);
+
+        assertNotNull(deserialized);
+        assertEquals("test-value", deserialized.value());
+        assertEquals(123456789L, deserialized.timestamp());
+
+        final Headers deserializedHeaders = deserialized.headers();
+        assertNotNull(deserializedHeaders);
+        assertEquals(1, deserializedHeaders.toArray().length);
+
+        final Header header = deserializedHeaders.lastHeader("key1");
+        assertNotNull(header);
+        assertEquals("key1", header.key());
+        assertArrayEquals("value1".getBytes(), header.value());
+    }
+
+    @Test
+    public void shouldDeserializeWithMultipleHeaders() {
+        final Headers headers = new RecordHeaders()
+            .add("key0", "value0".getBytes())
+            .add("key0", "value1".getBytes())
+            .add("key2", "value2".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final ValueTimestampHeaders<String> deserialized = 
deserializer.deserialize(TOPIC, serialized);
+
+        assertNotNull(deserialized);
+        assertEquals("test-value", deserialized.value());
+        assertEquals(123456789L, deserialized.timestamp());
+
+        final Headers deserializedHeaders = deserialized.headers();
+        assertNotNull(deserializedHeaders);
+        final Header[] headersArray = deserializedHeaders.toArray();
+        assertEquals(3, headersArray.length);
+        final Iterator<Header> iterator = headers.iterator();
+        Header next = iterator.next();
+        assertEquals("key0", next.key());
+        assertArrayEquals(("value0").getBytes(), next.value());
+        next = iterator.next();
+        assertEquals("key0", next.key());
+        assertArrayEquals(("value1").getBytes(), next.value());
+        next = iterator.next();
+        assertEquals("key2", next.key());
+        assertArrayEquals(("value2").getBytes(), next.value());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldReturnNullWhenSerializingNullValue() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.makeAllowNullable(null, 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        assertNull(serialized, "Serializer should return null when value is 
null");
+
+        final ValueTimestampHeaders<String> deserialized = 
deserializer.deserialize(TOPIC, serialized);
+        assertNull(deserialized, "Deserializer should return null when input 
is null");
+    }
+
+    @Test
+    public void shouldDeserializeWithHeaderContainingNullValue() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", null);
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final ValueTimestampHeaders<String> deserialized = 
deserializer.deserialize(TOPIC, serialized);
+
+        assertNotNull(deserialized);
+        assertEquals("test-value", deserialized.value());
+        assertEquals(123456789L, deserialized.timestamp());
+
+        final Header header = deserialized.headers().lastHeader("key1");
+        assertNotNull(header);
+        assertEquals("key1", header.key());
+        assertNull(header.value());
+    }
+
+    @Test
+    public void shouldExtractValue() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+
+        try (final Serde<String> stringSerde = Serdes.String()) {
+            final String value = 
ValueTimestampHeadersDeserializer.value(serialized, stringSerde.deserializer());
+            assertNotNull(value);
+            assertEquals("test-value", value);
+        }
+    }
+
+    @Test
+    public void shouldReturnNullForRawValueWhenInputIsNull() {
+        final ValueTimestampHeaders<String> value = 
ValueTimestampHeadersDeserializer.value(null, deserializer);
+        assertNull(value);
+    }
+
+    @Test
+    public void shouldExtractTimestamp() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final long timestamp = 
ValueTimestampHeadersDeserializer.timestamp(serialized);
+
+        assertEquals(123456789L, timestamp);
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenExtractingTimestampFromNull() {
+        // ByteBuffer.wrap() throws NullPointerException for null input
+        assertThrows(NullPointerException.class, () ->
+            ValueTimestampHeadersDeserializer.timestamp(null)
+        );
+    }
+
+    @Test
+    public void shouldExtractHeaders() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes())
+            .add("key2", "value2".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final Headers extractedHeaders = 
ValueTimestampHeadersDeserializer.headers(serialized);
+
+        assertNotNull(extractedHeaders);
+        assertEquals(2, extractedHeaders.toArray().length);
+        assertArrayEquals("value1".getBytes(), 
extractedHeaders.lastHeader("key1").value());
+        assertArrayEquals("value2".getBytes(), 
extractedHeaders.lastHeader("key2").value());
+    }
+
+    @Test
+    public void shouldExtractEmptyHeaders() {
+        final Headers headers = new RecordHeaders();
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        final Headers extractedHeaders = 
ValueTimestampHeadersDeserializer.headers(serialized);
+
+        assertNotNull(extractedHeaders);
+        assertEquals(0, extractedHeaders.toArray().length);
+    }
+
+    @Test
+    public void shouldReturnNullWhenExtractingHeadersFromNull() {
+        final Headers headers = 
ValueTimestampHeadersDeserializer.headers(null);
+        assertNull(headers);
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenDataIsTooShort() {
+        // Create malformed data: only headersSize varint, no actual headers 
or timestamp
+        final byte[] malformedData = new byte[] {0x02};  // headersSize = 1 
but no data follows
+
+        assertThrows(SerializationException.class, () ->
+            deserializer.deserialize(TOPIC, malformedData),
+            "Should throw SerializationException for malformed data"
+        );
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenHeadersSizeIsInconsistent() {
+        // Create data with headersSize = 10 but not enough actual data
+        final byte[] malformedData = new byte[] {
+            0x14,  // headersSize = 10 (ZigZag encoding)
+            0x00, 0x00  // Only 2 bytes when 10 + 8 (timestamp) are expected
+        };
+
+        assertThrows(SerializationException.class, () ->
+            deserializer.deserialize(TOPIC, malformedData),
+            "Should throw SerializationException when buffer doesn't have 
enough data"
+        );
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
new file mode 100644
index 00000000000..7326696cc12
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ValueTimestampHeadersSerializerTest {
+
+    private static final String TOPIC = "test-topic";
+    private static final long TIMESTAMP = 123456789L;
+    private static final String VALUE = "test-value";
+
+    private ValueTimestampHeadersSerializer<String> serializer;
+    private ValueTimestampHeadersDeserializer<String> deserializer;
+
+    @BeforeEach
+    void setup() {
+        serializer = new 
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+        deserializer = new 
ValueTimestampHeadersDeserializer<>(Serdes.String().deserializer());
+    }
+
+    @AfterEach
+    void cleanup() {
+        if (serializer != null) {
+            serializer.close();
+        }
+        if (deserializer != null) {
+            deserializer.close();
+        }
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeNonNullData() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes());
+        final ValueTimestampHeaders<String> original =
+            ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, original);
+        assertNotNull(serialized);
+
+        final ValueTimestampHeaders<String> deserialized =
+            deserializer.deserialize(TOPIC, serialized);
+
+        assertNotNull(deserialized);
+        assertEquals(original.value(), deserialized.value());
+        assertEquals(original.timestamp(), deserialized.timestamp());
+        assertArrayEquals(original.headers().toArray(), 
deserialized.headers().toArray());
+    }
+
+    @Test
+    public void shouldSerializeNullDataAsNull() {
+        final byte[] serialized = serializer.serialize(TOPIC, null);
+        assertNull(serialized);
+    }
+
+    @Test
+    public void shouldSerializeValueWithEmptyHeaders() {
+        final Headers emptyHeaders = new RecordHeaders();
+        final ValueTimestampHeaders<String> valueTimestampHeaders =
+            ValueTimestampHeaders.make(VALUE, TIMESTAMP, emptyHeaders);
+
+        final byte[] serialized = serializer.serialize(TOPIC, 
valueTimestampHeaders);
+        assertNotNull(serialized);
+
+        final ValueTimestampHeaders<String> deserialized =
+            deserializer.deserialize(TOPIC, serialized);
+
+        assertEquals(VALUE, deserialized.value());
+        assertEquals(TIMESTAMP, deserialized.timestamp());
+        assertEquals(0, deserialized.headers().toArray().length);
+    }
+
+    @Test
+    public void shouldSerializeValueWithMultipleHeaders() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes())
+            .add("key1", "value2".getBytes())
+            .add("key3", "value3".getBytes());
+        final ValueTimestampHeaders<String> valueTimestampHeaders =
+            ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+        final byte[] serialized = serializer.serialize(TOPIC, 
valueTimestampHeaders);
+        assertNotNull(serialized);
+
+        final ValueTimestampHeaders<String> deserialized =
+            deserializer.deserialize(TOPIC, serialized);
+
+        assertEquals(VALUE, deserialized.value());
+        assertEquals(TIMESTAMP, deserialized.timestamp());
+        assertEquals(3, deserialized.headers().toArray().length);
+    }
+
+    @Test
+    public void shouldReturnNullWhenSerializingNullValue() {
+        final ValueTimestampHeaders<String> valueTimestampHeaders =
+            ValueTimestampHeaders.makeAllowNullable(null, TIMESTAMP, new 
RecordHeaders());
+        final byte[] serialized = serializer.serialize(TOPIC, 
valueTimestampHeaders);
+        assertNull(serialized);
+    }
+}

Reply via email to