This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 45b5df2bc7d KAFKA-20121: Create ValueTimestampHeaders and its
serializer/deserializer (#21408)
45b5df2bc7d is described below
commit 45b5df2bc7d0d1ed2e40b3c9e0aff2edf8fdda94
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Feb 10 18:46:45 2026 +0000
KAFKA-20121: Create ValueTimestampHeaders and its serializer/deserializer
(#21408)
This PR adds the `ValueTimestampHeaders` and its related
serializer/deserializer as infrastructure of KIP-1271.
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/state/ValueTimestampHeaders.java | 127 ++++++++++
.../state/internals/HeadersDeserializer.java | 6 +
.../streams/state/internals/HeadersSerializer.java | 3 +-
.../ValueTimestampHeadersDeserializer.java | 167 +++++++++++++
.../internals/ValueTimestampHeadersSerde.java | 40 +++
.../internals/ValueTimestampHeadersSerializer.java | 128 ++++++++++
.../streams/state/ValueTimestampHeadersTest.java | 158 ++++++++++++
.../state/internals/HeadersDeserializerTest.java | 34 +++
.../state/internals/HeadersSerializerTest.java | 25 --
.../ValueTimestampHeadersDeserializerTest.java | 278 +++++++++++++++++++++
.../ValueTimestampHeadersSerializerTest.java | 127 ++++++++++
11 files changed, 1067 insertions(+), 26 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java
new file mode 100644
index 00000000000..6a924134867
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.util.Objects;
+
+/**
+ * Combines a value with its timestamp and associated record headers.
+ *
+ * @param <V> the value type
+ */
+public final class ValueTimestampHeaders<V> {
+
+ private final V value;
+ private final long timestamp;
+ private final Headers headers;
+
+ private ValueTimestampHeaders(final V value, final long timestamp, final
Headers headers) {
+ this.value = value;
+ this.timestamp = timestamp;
+ this.headers = headers == null ? new RecordHeaders() : headers;
+ }
+
+ /**
+ * Create a new {@link ValueTimestampHeaders} instance if the provided
{@code value} is not {@code null}.
+ *
+ * @param value the value
+ * @param timestamp the timestamp
+ * @param headers the headers (may be {@code null}, treated as empty)
+ * @param <V> the type of the value
+ * @return a new {@link ValueTimestampHeaders} instance if the provided
{@code value} is not {@code null};
+ * otherwise {@code null} is returned
+ */
+ public static <V> ValueTimestampHeaders<V> make(final V value,
+ final long timestamp,
+ final Headers headers) {
+ if (value == null) {
+ return null;
+ }
+ return new ValueTimestampHeaders<>(value, timestamp, headers);
+ }
+
+ /**
+ * Create a new {@link ValueTimestampHeaders} instance.
+ * The provided {@code value} may be {@code null}.
+ *
+ * @param value the value (may be {@code null})
+ * @param timestamp the timestamp
+ * @param headers the headers (may be {@code null}, treated as empty)
+ * @param <V> the type of the value
+ * @return a new {@link ValueTimestampHeaders} instance
+ */
+ public static <V> ValueTimestampHeaders<V> makeAllowNullable(final V value,
+ final long
timestamp,
+ final Headers
headers) {
+ return new ValueTimestampHeaders<>(value, timestamp, headers);
+ }
+
+ /**
+ * Return the wrapped {@code value} of the given {@code
valueTimestampHeaders} parameter
+ * if the parameter is not {@code null}.
+ *
+ * @param valueTimestampHeaders a {@link ValueTimestampHeaders} instance;
can be {@code null}
+ * @param <V> the type of the value
+ * @return the wrapped {@code value} of {@code valueTimestampHeaders} if
not {@code null}; otherwise {@code null}
+ */
+ public static <V> V getValueOrNull(final ValueTimestampHeaders<V>
valueTimestampHeaders) {
+ return valueTimestampHeaders == null ? null :
valueTimestampHeaders.value;
+ }
+
+ public V value() {
+ return value;
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
+
+ public Headers headers() {
+ return headers;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ValueTimestampHeaders)) {
+ return false;
+ }
+ final ValueTimestampHeaders<?> that = (ValueTimestampHeaders<?>) o;
+ return timestamp == that.timestamp
+ && Objects.equals(value, that.value)
+ && Objects.equals(this.headers, that.headers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, timestamp, headers);
+ }
+
+ @Override
+ public String toString() {
+ return "ValueTimestampHeaders{" +
+ "value=" + value +
+ ", timestamp=" + timestamp +
+ ", headers=" + headers +
+ '}';
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
index 70a1107d981..88ca8848bdc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
@@ -86,4 +86,10 @@ public class HeadersDeserializer implements
Deserializer<Headers> {
return headers;
}
+
+ public static Headers deserialize(final byte[] data) {
+ try (HeadersDeserializer deserializer = new HeadersDeserializer()) {
+ return deserializer.deserialize("", data);
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index 91a97e78e4b..66f2c4e0da9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
@@ -93,7 +94,7 @@ public class HeadersSerializer implements Serializer<Headers>
{
return baos.toByteArray();
} catch (IOException e) {
- throw new RuntimeException("Failed to serialize headers", e);
+ throw new SerializationException("Failed to serialize headers", e);
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
new file mode 100644
index 00000000000..bbd91ae247b
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for ValueTimestampHeaders.
+ *
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ * - For null/empty headers: headersSize = 0, headersBytes is omitted (0
bytes)
+ * - For non-empty headers: headersSize > 0, serialized headers in the
format [count(varint)][header1][header2]... to be processed by
HeadersDeserializer.
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value to be deserialized with the provided value
deserializer
+ *
+ * This is used by KIP-1271 to deserialize values with timestamps and headers
from state stores.
+ */
+class ValueTimestampHeadersDeserializer<V> implements
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
+ private static final LongDeserializer LONG_DESERIALIZER = new
LongDeserializer();
+ private static final HeadersDeserializer HEADERS_DESERIALIZER = new
HeadersDeserializer();
+
+ public final Deserializer<V> valueDeserializer;
+ private final LongDeserializer timestampDeserializer;
+ private final HeadersDeserializer headersDeserializer;
+
+ ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer)
{
+ Objects.requireNonNull(valueDeserializer);
+ this.valueDeserializer = valueDeserializer;
+ this.timestampDeserializer = new LongDeserializer();
+ this.headersDeserializer = new HeadersDeserializer();
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ valueDeserializer.configure(configs, isKey);
+ timestampDeserializer.configure(configs, isKey);
+ headersDeserializer.configure(configs, isKey);
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> deserialize(final String topic, final
byte[] valueTimestampHeaders) {
+ if (valueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(valueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+
+ final byte[] rawHeaders = readBytes(buffer, headersSize);
+ final Headers headers = headersDeserializer.deserialize(topic,
rawHeaders);
+ final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
+ final long timestamp = timestampDeserializer.deserialize(topic,
rawTimestamp);
+ final byte[] rawValue = readBytes(buffer, buffer.remaining());
+ final V value = valueDeserializer.deserialize(topic, rawValue);
+
+ return ValueTimestampHeaders.make(value, timestamp, headers);
+ }
+
+ @Override
+ public void close() {
+ valueDeserializer.close();
+ timestampDeserializer.close();
+ headersDeserializer.close();
+ }
+
+ @Override
+ public void setIfUnset(final SerdeGetter getter) {
+ // ValueTimestampHeadersDeserializer never wraps a null deserializer
(or configure would throw),
+ // but it may wrap a deserializer that itself wraps a null
deserializer.
+ initNullableDeserializer(valueDeserializer, getter);
+ }
+
+ /**
+ * Reads the specified number of bytes from the buffer with validation.
+ *
+ * @param buffer the ByteBuffer to read from
+ * @param length the number of bytes to read
+ * @return the byte array containing the read bytes
+ * @throws SerializationException if buffer doesn't have enough bytes
+ */
+ private static byte[] readBytes(final ByteBuffer buffer, final int length)
{
+ if (buffer.remaining() < length) {
+ throw new SerializationException(
+ "Invalid ValueTimestampHeaders format: expected " + length +
+ " bytes but only " + buffer.remaining() + " bytes remaining"
+ );
+ }
+ final byte[] bytes = new byte[length];
+ buffer.get(bytes);
+ return bytes;
+ }
+
+ /**
+ * Extract value from serialized ValueTimestampHeaders.
+ */
+ static <T> T value(final byte[] rawValueTimestampHeaders, final
Deserializer<T> deserializer) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ // skip headers plus timestamp
+ buffer.position(buffer.position() + headersSize + Long.BYTES);
+ byte[] bytes = readBytes(buffer, buffer.remaining());
+
+ return deserializer.deserialize("", bytes);
+ }
+
+ /**
+ * Extract timestamp from serialized ValueTimestampHeaders.
+ */
+ static long timestamp(final byte[] rawValueTimestampHeaders) {
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize);
+
+ final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
+ return LONG_DESERIALIZER.deserialize("", rawTimestamp);
+ }
+
+ /**
+ * Extract headers from serialized ValueTimestampHeaders.
+ */
+ static Headers headers(final byte[] rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ final byte[] rawHeaders = readBytes(buffer, headersSize);
+ return HEADERS_DESERIALIZER.deserialize("", rawHeaders);
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerde.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerde.java
new file mode 100644
index 00000000000..90901a6d9e0
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerde.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Serde for ValueTimestampHeaders.
+ *
+ * This serde wraps a value serde and handles serialization/deserialization of
+ * values along with their timestamps and headers.
+ *
+ * This is used by KIP-1271 to support headers in state stores.
+ */
+public class ValueTimestampHeadersSerde<V> extends
WrappingNullableSerde<ValueTimestampHeaders<V>, Void, V> {
+ public ValueTimestampHeadersSerde(final Serde<V> valueSerde) {
+ super(
+ new ValueTimestampHeadersSerializer<>(requireNonNull(valueSerde,
"valueSerde was null").serializer()),
+ new ValueTimestampHeadersDeserializer<>(requireNonNull(valueSerde,
"valueSerde was null").deserializer())
+ );
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
new file mode 100644
index 00000000000..390458525e1
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ * - For null/empty headers: headersSize = 0, headersBytes is omitted (0
bytes)
+ * - For non-empty headers: headersSize > 0, serialized headers
([count(varint)][header1][header2]...) from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+ public final Serializer<V> valueSerializer;
+ private final LongSerializer timestampSerializer;
+ private final HeadersSerializer headersSerializer;
+
+ ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+ Objects.requireNonNull(valueSerializer);
+ this.valueSerializer = valueSerializer;
+ this.timestampSerializer = new LongSerializer();
+ this.headersSerializer = new HeadersSerializer();
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ valueSerializer.configure(configs, isKey);
+ timestampSerializer.configure(configs, isKey);
+ headersSerializer.configure(configs, isKey);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final ValueTimestampHeaders<V>
valueTimestampHeaders) {
+ if (valueTimestampHeaders == null) {
+ return null;
+ }
+ return serialize(topic, valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp(), valueTimestampHeaders.headers());
+ }
+
+ private byte[] serialize(final String topic, final V plainValue, final
long timestamp, final Headers headers) {
+ if (plainValue == null) {
+ return null;
+ }
+
+ final byte[] rawValue = valueSerializer.serialize(topic, headers,
plainValue);
+
+ // Since we can't control the result of the internal serializer, we
make sure that the result
+ // is not null as well.
+ // Serializing non-null values to null can be useful when working with
Optional-like values
+ // where the Optional.empty case is serialized to null.
+ // See the discussion here: https://github.com/apache/kafka/pull/7679
+ if (rawValue == null) {
+ return null;
+ }
+
+ final byte[] rawTimestamp = timestampSerializer.serialize(topic,
timestamp);
+
+ // empty (byte[0]) for null/empty headers, or
[count][header1][header2]... for non-empty
+ final byte[] rawHeaders = headersSerializer.serialize(topic, headers);
+
+ // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+
+ ByteUtils.writeVarint(rawHeaders.length, out); // headersSize (it
may be 0 due to null/empty headers)
+ out.write(rawHeaders); // empty (byte[0])
for null/empty headers, or [count][header1][header2]... for non-empty
+ out.write(rawTimestamp); // [timestamp(8)]
+ out.write(rawValue); // [value]
+
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new SerializationException("Failed to serialize
ValueTimestampHeaders", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ valueSerializer.close();
+ timestampSerializer.close();
+ headersSerializer.close();
+ }
+
+ @Override
+ public void setIfUnset(final SerdeGetter getter) {
+ // ValueTimestampHeadersSerializer never wraps a null serializer (or
configure would throw),
+ // but it may wrap a serializer that itself wraps a null serializer.
+ initNullableSerializer(valueSerializer, getter);
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/ValueTimestampHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/ValueTimestampHeadersTest.java
new file mode 100644
index 00000000000..277f6bcbfa3
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/ValueTimestampHeadersTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ValueTimestampHeadersTest {
+
+ private static final String VALUE = "test-value";
+ private static final long TIMESTAMP = 123456789L;
+
+ @Test
+ public void shouldCreateInstanceWithMake() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+ assertNotNull(valueTimestampHeaders);
+ assertEquals(VALUE, valueTimestampHeaders.value());
+ assertEquals(TIMESTAMP, valueTimestampHeaders.timestamp());
+ assertEquals(headers, valueTimestampHeaders.headers());
+ }
+
+ @Test
+ public void shouldReturnNullWhenValueIsNullWithMake() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.make(null, TIMESTAMP, headers);
+
+ assertNull(valueTimestampHeaders);
+ }
+
+ @Test
+ public void shouldCreateInstanceWithMakeAllowNullable() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.makeAllowNullable(VALUE, TIMESTAMP, headers);
+
+ assertNotNull(valueTimestampHeaders);
+ assertEquals(VALUE, valueTimestampHeaders.value());
+ assertEquals(TIMESTAMP, valueTimestampHeaders.timestamp());
+ assertEquals(headers, valueTimestampHeaders.headers());
+ }
+
+ @Test
+ public void shouldCreateInstanceWithNullValueUsingMakeAllowNullable() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.makeAllowNullable(null, TIMESTAMP, headers);
+
+ assertNotNull(valueTimestampHeaders);
+ assertNull(valueTimestampHeaders.value());
+ assertEquals(TIMESTAMP, valueTimestampHeaders.timestamp());
+ assertEquals(headers, valueTimestampHeaders.headers());
+ }
+
+ @Test
+ public void shouldCreateEmptyHeadersWhenHeadersAreNull() {
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, null);
+
+ assertNotNull(valueTimestampHeaders);
+ assertNotNull(valueTimestampHeaders.headers());
+ assertEquals(0, valueTimestampHeaders.headers().toArray().length);
+ }
+
+ @Test
+ public void shouldGetValueOrNull() {
+ final Headers headers = new RecordHeaders();
+ ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+ assertEquals(VALUE,
ValueTimestampHeaders.getValueOrNull(valueTimestampHeaders));
+
+ valueTimestampHeaders = ValueTimestampHeaders.makeAllowNullable(null,
TIMESTAMP, headers);
+
assertNull(ValueTimestampHeaders.getValueOrNull(valueTimestampHeaders));
+
+ assertNull(ValueTimestampHeaders.getValueOrNull(null));
+ }
+
+ @Test
+ public void shouldBeEqualWhenSameValues() {
+ final Headers headers1 = new RecordHeaders().add("key1",
"value1".getBytes());
+ final Headers headers2 = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final ValueTimestampHeaders<String> valueTimestampHeaders1 =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers1);
+ final ValueTimestampHeaders<String> valueTimestampHeaders2 =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers2);
+
+ assertEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+ assertEquals(valueTimestampHeaders1.hashCode(),
valueTimestampHeaders2.hashCode());
+ }
+
+ @Test
+ public void shouldNotBeEqualWhenDifferentValues() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final ValueTimestampHeaders<String> valueTimestampHeaders1 =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+ final ValueTimestampHeaders<String> valueTimestampHeaders2 =
ValueTimestampHeaders.make("different", TIMESTAMP, headers);
+
+ assertNotEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+ }
+
+ @Test
+ public void shouldNotBeEqualWhenDifferentTimestamps() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final ValueTimestampHeaders<String> valueTimestampHeaders1 =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+ final ValueTimestampHeaders<String> valueTimestampHeaders2 =
ValueTimestampHeaders.make(VALUE, TIMESTAMP + 1, headers);
+
+ assertNotEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+ }
+
+ @Test
+ public void shouldNotBeEqualWhenDifferentHeaders() {
+ final Headers headers1 = new RecordHeaders().add("key1",
"value1".getBytes());
+ final Headers headers2 = new RecordHeaders().add("key2",
"value2".getBytes());
+
+ final ValueTimestampHeaders<String> valueTimestampHeaders1 =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers1);
+ final ValueTimestampHeaders<String> valueTimestampHeaders2 =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers2);
+
+ assertNotEquals(valueTimestampHeaders1, valueTimestampHeaders2);
+ }
+
+ @Test
+ public void shouldBeEqualToItself() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+ assertEquals(valueTimestampHeaders, valueTimestampHeaders);
+ }
+
+ @Test
+ public void shouldHaveCorrectToString() {
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+ final String toString = valueTimestampHeaders.toString();
+ assertNotNull(toString);
+ assertTrue(toString.contains("value=" + VALUE));
+ assertTrue(toString.contains("timestamp=" + TIMESTAMP));
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
index 2a36d34c564..7737c60290a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
+import java.util.Iterator;
+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -124,4 +126,36 @@ public class HeadersDeserializerTest {
assertEquals("key1", header.key());
assertArrayEquals(new byte[0], header.value());
}
+
+ @Test
+ public void shouldAllowDuplicateKeys() {
+ final Headers original = new RecordHeaders()
+ .add("key0", "value0".getBytes())
+ .add("key0", "value0".getBytes())
+ .add("key1", "value1".getBytes())
+ .add("key2", "value2".getBytes())
+ .add("key2", "value3".getBytes());
+ final byte[] serialized = serializer.serialize("", original);
+ final Headers deserialized = deserializer.deserialize("", serialized);
+ assertNotNull(deserialized);
+
+ final Header[] headerArray = deserialized.toArray();
+ assertEquals(5, headerArray.length);
+ final Iterator<Header> iterator = deserialized.iterator();
+ Header next = iterator.next();
+ assertEquals("key0", next.key());
+ assertArrayEquals("value0".getBytes(), next.value());
+ next = iterator.next();
+ assertEquals("key0", next.key());
+ assertArrayEquals("value0".getBytes(), next.value());
+ next = iterator.next();
+ assertEquals("key1", next.key());
+ assertArrayEquals("value1".getBytes(), next.value());
+ next = iterator.next();
+ assertEquals("key2", next.key());
+ assertArrayEquals("value2".getBytes(), next.value());
+ next = iterator.next();
+ assertEquals("key2", next.key());
+ assertArrayEquals("value3".getBytes(), next.value());
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index dc0afdec7b0..bb2ab983475 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -129,29 +129,4 @@ public class HeadersSerializerTest {
assertEquals("key1", header.key());
assertArrayEquals(new byte[0], header.value());
}
-
- @Test
- public void shouldSerializeHeadersWithSpecialCharacters() {
- final Headers headers = new RecordHeaders()
- .add("key-with-dash", "value".getBytes())
- .add("key.with.dots", "value".getBytes())
- .add("key_with_underscores", "value".getBytes());
- final byte[] serialized = serializer.serialize("", headers);
-
- assertNotNull(serialized);
- assertTrue(serialized.length > 0);
-
- final Headers deserialized = deserializer.deserialize("", serialized);
- assertNotNull(deserialized);
- assertEquals(3, deserialized.toArray().length);
-
- assertNotNull(deserialized.lastHeader("key-with-dash"));
- assertArrayEquals("value".getBytes(),
deserialized.lastHeader("key-with-dash").value());
-
- assertNotNull(deserialized.lastHeader("key.with.dots"));
- assertArrayEquals("value".getBytes(),
deserialized.lastHeader("key.with.dots").value());
-
- assertNotNull(deserialized.lastHeader("key_with_underscores"));
- assertArrayEquals("value".getBytes(),
deserialized.lastHeader("key_with_underscores").value());
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
new file mode 100644
index 00000000000..806755b213c
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ValueTimestampHeadersDeserializerTest {
+
+ private static final String TOPIC = "test-topic";
+ private ValueTimestampHeadersSerializer<String> serializer;
+ private ValueTimestampHeadersDeserializer<String> deserializer;
+
+ @BeforeEach
+ void setup() {
+ serializer = new
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+ deserializer = new
ValueTimestampHeadersDeserializer<>(Serdes.String().deserializer());
+ }
+
+ @AfterEach
+ void cleanup() {
+ if (serializer != null) {
+ serializer.close();
+ }
+ if (deserializer != null) {
+ deserializer.close();
+ }
+ }
+
+ @Test
+ public void shouldDeserializeNullToNull() {
+ final ValueTimestampHeaders<String> result =
deserializer.deserialize(TOPIC, null);
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldDeserializeWithEmptyHeaders() {
+ final Headers headers = new RecordHeaders();
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final ValueTimestampHeaders<String> deserialized =
deserializer.deserialize(TOPIC, serialized);
+
+ assertNotNull(deserialized);
+ assertEquals("test-value", deserialized.value());
+ assertEquals(123456789L, deserialized.timestamp());
+ assertNotNull(deserialized.headers());
+ assertEquals(0, deserialized.headers().toArray().length);
+ }
+
+ @Test
+ public void shouldDeserializeWithSingleHeader() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", "value1".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final ValueTimestampHeaders<String> deserialized =
deserializer.deserialize(TOPIC, serialized);
+
+ assertNotNull(deserialized);
+ assertEquals("test-value", deserialized.value());
+ assertEquals(123456789L, deserialized.timestamp());
+
+ final Headers deserializedHeaders = deserialized.headers();
+ assertNotNull(deserializedHeaders);
+ assertEquals(1, deserializedHeaders.toArray().length);
+
+ final Header header = deserializedHeaders.lastHeader("key1");
+ assertNotNull(header);
+ assertEquals("key1", header.key());
+ assertArrayEquals("value1".getBytes(), header.value());
+ }
+
+ @Test
+ public void shouldDeserializeWithMultipleHeaders() {
+ final Headers headers = new RecordHeaders()
+ .add("key0", "value0".getBytes())
+ .add("key0", "value1".getBytes())
+ .add("key2", "value2".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final ValueTimestampHeaders<String> deserialized =
deserializer.deserialize(TOPIC, serialized);
+
+ assertNotNull(deserialized);
+ assertEquals("test-value", deserialized.value());
+ assertEquals(123456789L, deserialized.timestamp());
+
+ final Headers deserializedHeaders = deserialized.headers();
+ assertNotNull(deserializedHeaders);
+ final Header[] headersArray = deserializedHeaders.toArray();
+ assertEquals(3, headersArray.length);
+ final Iterator<Header> iterator = headers.iterator();
+ Header next = iterator.next();
+ assertEquals("key0", next.key());
+ assertArrayEquals(("value0").getBytes(), next.value());
+ next = iterator.next();
+ assertEquals("key0", next.key());
+ assertArrayEquals(("value1").getBytes(), next.value());
+ next = iterator.next();
+ assertEquals("key2", next.key());
+ assertArrayEquals(("value2").getBytes(), next.value());
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldReturnNullWhenSerializingNullValue() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", "value1".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.makeAllowNullable(null, 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ assertNull(serialized, "Serializer should return null when value is
null");
+
+ final ValueTimestampHeaders<String> deserialized =
deserializer.deserialize(TOPIC, serialized);
+ assertNull(deserialized, "Deserializer should return null when input
is null");
+ }
+
+ @Test
+ public void shouldDeserializeWithHeaderContainingNullValue() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", null);
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final ValueTimestampHeaders<String> deserialized =
deserializer.deserialize(TOPIC, serialized);
+
+ assertNotNull(deserialized);
+ assertEquals("test-value", deserialized.value());
+ assertEquals(123456789L, deserialized.timestamp());
+
+ final Header header = deserialized.headers().lastHeader("key1");
+ assertNotNull(header);
+ assertEquals("key1", header.key());
+ assertNull(header.value());
+ }
+
+ @Test
+ public void shouldExtractValue() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", "value1".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+
+ try (final Serde<String> stringSerde = Serdes.String()) {
+ final String value =
ValueTimestampHeadersDeserializer.value(serialized, stringSerde.deserializer());
+ assertNotNull(value);
+ assertEquals("test-value", value);
+ }
+ }
+
+ @Test
+ public void shouldReturnNullForRawValueWhenInputIsNull() {
+ final ValueTimestampHeaders<String> value =
ValueTimestampHeadersDeserializer.value(null, deserializer);
+ assertNull(value);
+ }
+
+ @Test
+ public void shouldExtractTimestamp() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", "value1".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final long timestamp =
ValueTimestampHeadersDeserializer.timestamp(serialized);
+
+ assertEquals(123456789L, timestamp);
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenExtractingTimestampFromNull() {
+ // ByteBuffer.wrap() throws NullPointerException for null input
+ assertThrows(NullPointerException.class, () ->
+ ValueTimestampHeadersDeserializer.timestamp(null)
+ );
+ }
+
+ @Test
+ public void shouldExtractHeaders() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", "value1".getBytes())
+ .add("key2", "value2".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final Headers extractedHeaders =
ValueTimestampHeadersDeserializer.headers(serialized);
+
+ assertNotNull(extractedHeaders);
+ assertEquals(2, extractedHeaders.toArray().length);
+ assertArrayEquals("value1".getBytes(),
extractedHeaders.lastHeader("key1").value());
+ assertArrayEquals("value2".getBytes(),
extractedHeaders.lastHeader("key2").value());
+ }
+
+ @Test
+ public void shouldExtractEmptyHeaders() {
+ final Headers headers = new RecordHeaders();
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make("test-value", 123456789L, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ final Headers extractedHeaders =
ValueTimestampHeadersDeserializer.headers(serialized);
+
+ assertNotNull(extractedHeaders);
+ assertEquals(0, extractedHeaders.toArray().length);
+ }
+
+ @Test
+ public void shouldReturnNullWhenExtractingHeadersFromNull() {
+ final Headers headers =
ValueTimestampHeadersDeserializer.headers(null);
+ assertNull(headers);
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenDataIsTooShort() {
+ // Create malformed data: only headersSize varint, no actual headers
or timestamp
+ final byte[] malformedData = new byte[] {0x02}; // headersSize = 1
but no data follows
+
+ assertThrows(SerializationException.class, () ->
+ deserializer.deserialize(TOPIC, malformedData),
+ "Should throw SerializationException for malformed data"
+ );
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenHeadersSizeIsInconsistent() {
+ // Create data with headersSize = 10 but not enough actual data
+ final byte[] malformedData = new byte[] {
+ 0x14, // headersSize = 10 (ZigZag encoding)
+ 0x00, 0x00 // Only 2 bytes when 10 + 8 (timestamp) are expected
+ };
+
+ assertThrows(SerializationException.class, () ->
+ deserializer.deserialize(TOPIC, malformedData),
+ "Should throw SerializationException when buffer doesn't have
enough data"
+ );
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
new file mode 100644
index 00000000000..7326696cc12
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ValueTimestampHeadersSerializerTest {
+
+ private static final String TOPIC = "test-topic";
+ private static final long TIMESTAMP = 123456789L;
+ private static final String VALUE = "test-value";
+
+ private ValueTimestampHeadersSerializer<String> serializer;
+ private ValueTimestampHeadersDeserializer<String> deserializer;
+
+ @BeforeEach
+ void setup() {
+ serializer = new
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+ deserializer = new
ValueTimestampHeadersDeserializer<>(Serdes.String().deserializer());
+ }
+
+ @AfterEach
+ void cleanup() {
+ if (serializer != null) {
+ serializer.close();
+ }
+ if (deserializer != null) {
+ deserializer.close();
+ }
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeNonNullData() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", "value1".getBytes());
+ final ValueTimestampHeaders<String> original =
+ ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC, original);
+ assertNotNull(serialized);
+
+ final ValueTimestampHeaders<String> deserialized =
+ deserializer.deserialize(TOPIC, serialized);
+
+ assertNotNull(deserialized);
+ assertEquals(original.value(), deserialized.value());
+ assertEquals(original.timestamp(), deserialized.timestamp());
+ assertArrayEquals(original.headers().toArray(),
deserialized.headers().toArray());
+ }
+
+ @Test
+ public void shouldSerializeNullDataAsNull() {
+ final byte[] serialized = serializer.serialize(TOPIC, null);
+ assertNull(serialized);
+ }
+
+ @Test
+ public void shouldSerializeValueWithEmptyHeaders() {
+ final Headers emptyHeaders = new RecordHeaders();
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
+ ValueTimestampHeaders.make(VALUE, TIMESTAMP, emptyHeaders);
+
+ final byte[] serialized = serializer.serialize(TOPIC,
valueTimestampHeaders);
+ assertNotNull(serialized);
+
+ final ValueTimestampHeaders<String> deserialized =
+ deserializer.deserialize(TOPIC, serialized);
+
+ assertEquals(VALUE, deserialized.value());
+ assertEquals(TIMESTAMP, deserialized.timestamp());
+ assertEquals(0, deserialized.headers().toArray().length);
+ }
+
+ @Test
+ public void shouldSerializeValueWithMultipleHeaders() {
+ final Headers headers = new RecordHeaders()
+ .add("key1", "value1".getBytes())
+ .add("key1", "value2".getBytes())
+ .add("key3", "value3".getBytes());
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
+ ValueTimestampHeaders.make(VALUE, TIMESTAMP, headers);
+
+ final byte[] serialized = serializer.serialize(TOPIC,
valueTimestampHeaders);
+ assertNotNull(serialized);
+
+ final ValueTimestampHeaders<String> deserialized =
+ deserializer.deserialize(TOPIC, serialized);
+
+ assertEquals(VALUE, deserialized.value());
+ assertEquals(TIMESTAMP, deserialized.timestamp());
+ assertEquals(3, deserialized.headers().toArray().length);
+ }
+
+ @Test
+ public void shouldReturnNullWhenSerializingNullValue() {
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
+ ValueTimestampHeaders.makeAllowNullable(null, TIMESTAMP, new
RecordHeaders());
+ final byte[] serialized = serializer.serialize(TOPIC,
valueTimestampHeaders);
+ assertNull(serialized);
+ }
+}