This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b46af69aa4 KAFKA-20158: Add AggregationWithHeaders, serialization 
support and tests (1/N) (#21511)
7b46af69aa4 is described below

commit 7b46af69aa495ea612e1b771e8f5b1b5b89862f8
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Feb 24 09:36:36 2026 -0500

    KAFKA-20158: Add AggregationWithHeaders, serialization support and tests 
(1/N) (#21511)
    
    This PR introduces `AggregationWithHeaders`  and serialization  support
    introduced in KIP-1271 for storing session aggregations with  headers.
---
 .../streams/state/AggregationWithHeaders.java      | 114 ++++++++++++++++++
 .../AggregationWithHeadersDeserializer.java        | 128 ++++++++++++++++++++
 .../AggregationWithHeadersSerializer.java          | 106 +++++++++++++++++
 .../streams/state/AggregationWithHeadersTest.java  | 130 +++++++++++++++++++++
 .../AggregationWithHeadersDeserializerTest.java    | 119 +++++++++++++++++++
 .../AggregationWithHeadersSerializerTest.java      |  93 +++++++++++++++
 6 files changed, 690 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/AggregationWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/AggregationWithHeaders.java
new file mode 100644
index 00000000000..0e1173c2ed2
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/AggregationWithHeaders.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+
+import java.util.Objects;
+
+/**
+ * Combines an aggregated value with its associated record headers.
+ * This is used by SessionStoreWithHeaders to store session aggregations along 
with headers.
+ *
+ * @param <AGG> the aggregation type
+ */
+public final class AggregationWithHeaders<AGG> {
+
+    private final AGG aggregation;
+    private final Headers headers;
+
+    private AggregationWithHeaders(final AGG aggregation, final Headers 
headers) {
+        Objects.requireNonNull(headers, "headers must not be null");
+        this.aggregation = aggregation;
+        this.headers = headers;
+    }
+
+    /**
+     * Create a new {@link AggregationWithHeaders} instance if the provided 
{@code aggregation} is not {@code null}.
+     *
+     * @param aggregation the aggregation
+     * @param headers     the headers (may be {@code null}, treated as empty)
+     * @param <AGG>       the type of the aggregation
+     * @return a new {@link AggregationWithHeaders} instance if the provided 
{@code aggregation} is not {@code null};
+     * otherwise {@code null} is returned
+     */
+    public static <AGG> AggregationWithHeaders<AGG> make(final AGG 
aggregation, final Headers headers) {
+        if (aggregation == null) {
+            return null;
+        }
+        return new AggregationWithHeaders<>(aggregation, headers);
+    }
+
+    /**
+     * Create a new {@link AggregationWithHeaders} instance.
+     * The provided {@code aggregation} may be {@code null}.
+     *
+     * @param aggregation the aggregation (may be {@code null})
+     * @param headers     the headers (may be {@code null}, treated as empty)
+     * @param <AGG>       the type of the aggregation
+     * @return a new {@link AggregationWithHeaders} instance
+     */
+    public static <AGG> AggregationWithHeaders<AGG> makeAllowNullable(final 
AGG aggregation, final Headers headers) {
+        return new AggregationWithHeaders<>(aggregation, headers);
+    }
+
+    /**
+     * Return the wrapped {@code aggregation} of the given {@code 
aggregationWithHeaders} parameter
+     * if the parameter is not {@code null}.
+     *
+     * @param aggregationWithHeaders an {@link AggregationWithHeaders} 
instance; can be {@code null}
+     * @param <AGG>                  the type of the aggregation
+     * @return the wrapped {@code aggregation} of {@code 
aggregationWithHeaders} if not {@code null}; otherwise {@code null}
+     */
+    public static <AGG> AGG getAggregationOrNull(final 
AggregationWithHeaders<AGG> aggregationWithHeaders) {
+        return aggregationWithHeaders == null ? null : 
aggregationWithHeaders.aggregation;
+    }
+
+    public AGG aggregation() {
+        return aggregation;
+    }
+
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof AggregationWithHeaders)) {
+            return false;
+        }
+        final AggregationWithHeaders<?> that = (AggregationWithHeaders<?>) o;
+        return Objects.equals(aggregation, that.aggregation)
+            && Objects.equals(this.headers, that.headers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(aggregation, headers);
+    }
+
+    @Override
+    public String toString() {
+        return "AggregationWithHeaders{" +
+            "aggregation=" + aggregation +
+            ", headers=" + headers +
+            '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
new file mode 100644
index 00000000000..f28078947ac
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for AggregationWithHeaders.
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][aggregation]
+ * <p>
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ *   - For null/empty headers: headersSize = 0, headersBytes is omitted (0 
bytes)
+ *   - For non-empty headers: headersSize > 0, serialized headers in the 
format [count(varint)][header1][header2]... to be processed by 
HeadersDeserializer.
+ * - aggregation: Serialized aggregation to be deserialized with the provided 
aggregation deserializer
+ * <p>
+ * This is used by KIP-1271 to deserialize aggregations with headers from 
session state stores.
+ */
+class AggregationWithHeadersDeserializer<AGG> implements 
WrappingNullableDeserializer<AggregationWithHeaders<AGG>, Void, AGG> {
+
+    public final Deserializer<AGG> aggregationDeserializer;
+
+    AggregationWithHeadersDeserializer(final Deserializer<AGG> 
aggregationDeserializer) {
+        Objects.requireNonNull(aggregationDeserializer);
+        this.aggregationDeserializer = aggregationDeserializer;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        aggregationDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public AggregationWithHeaders<AGG> deserialize(final String topic, final 
byte[] aggregationWithHeaders) {
+        if (aggregationWithHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
+        final Headers headers = readHeaders(buffer);
+        final byte[] rawAggregation = readBytes(buffer, buffer.remaining());
+        final AGG aggregation = aggregationDeserializer.deserialize(topic, 
headers, rawAggregation);
+
+        return AggregationWithHeaders.makeAllowNullable(aggregation, headers);
+    }
+
+    @Override
+    public void close() {
+        aggregationDeserializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        initNullableDeserializer(aggregationDeserializer, getter);
+    }
+
+
+    /**
+     * Reads the specified number of bytes from the buffer with validation.
+     *
+     * @param buffer the ByteBuffer to read from
+     * @param length the number of bytes to read
+     * @return the byte array containing the read bytes
+     * @throws SerializationException if buffer doesn't have enough bytes or 
length is negative
+     */
+    private static byte[] readBytes(final ByteBuffer buffer, final int length) 
{
+        if (length < 0) {
+            throw new SerializationException(
+                "Invalid AggregationWithHeaders format: negative length " + 
length
+            );
+        }
+        if (buffer.remaining() < length) {
+            throw new SerializationException(
+                "Invalid AggregationWithHeaders format: expected " + length +
+                    " bytes but only " + buffer.remaining() + " bytes 
remaining"
+            );
+        }
+        final byte[] bytes = new byte[length];
+        buffer.get(bytes);
+        return bytes;
+    }
+
+    /**
+     * Extract headers from serialized AggregationWithHeaders.
+     */
+    static Headers headers(final byte[] rawAggregationWithHeaders) {
+        if (rawAggregationWithHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders);
+        return readHeaders(buffer);
+    }
+
+    private static Headers readHeaders(final ByteBuffer buffer) {
+        final int headersSize = ByteUtils.readVarint(buffer);
+        final byte[] rawHeaders = readBytes(buffer, headersSize);
+        return HeadersDeserializer.deserialize(rawHeaders);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
new file mode 100644
index 00000000000..1781d1fa970
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for AggregationWithHeaders.
+ <p>
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][aggregation]
+ * <p>
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ *   - For null/empty headers: headersSize = 0, headersBytes is omitted (0 
bytes)
+ *   - For non-empty headers: headersSize > 0, serialized headers 
([count(varint)][header1][header2]...) from HeadersSerializer
+ * - aggregation: Serialized aggregation using the provided aggregation 
serializer
+ * <p>
+ * This is used by KIP-1271 to serialize aggregations with headers for session 
state stores.
+ */
+class AggregationWithHeadersSerializer<AGG> implements 
WrappingNullableSerializer<AggregationWithHeaders<AGG>, Void, AGG> {
+    public final Serializer<AGG> aggregationSerializer;
+
+    AggregationWithHeadersSerializer(final Serializer<AGG> 
aggregationSerializer) {
+        Objects.requireNonNull(aggregationSerializer);
+        this.aggregationSerializer = aggregationSerializer;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        aggregationSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final 
AggregationWithHeaders<AGG> aggregationWithHeaders) {
+        if (aggregationWithHeaders == null) {
+            return null;
+        }
+        return serialize(topic, aggregationWithHeaders.aggregation(), 
aggregationWithHeaders.headers());
+    }
+
+    private byte[] serialize(final String topic, final AGG plainAggregation, 
final Headers headers) {
+        if (plainAggregation == null) {
+            return null;
+        }
+
+        final byte[] rawAggregation = aggregationSerializer.serialize(topic, 
headers, plainAggregation);
+
+        if (rawAggregation == null) {
+            return null;
+        }
+
+        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);
+            out.write(rawHeaders);
+            out.write(rawAggregation);
+
+            return baos.toByteArray();
+        } catch (final IOException e) {
+            throw new SerializationException("Failed to serialize 
AggregationWithHeaders on topic: " + topic, e);
+        }
+    }
+
+    @Override
+    public void close() {
+        aggregationSerializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        initNullableSerializer(aggregationSerializer, getter);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/AggregationWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/AggregationWithHeadersTest.java
new file mode 100644
index 00000000000..c7a6d025180
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/AggregationWithHeadersTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AggregationWithHeadersTest {
+
+    @Test
+    public void shouldCreateAggregationWithHeaders() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+
+        assertNotNull(aggregationWithHeaders);
+        assertEquals(aggregation, aggregationWithHeaders.aggregation());
+        assertEquals(headers, aggregationWithHeaders.headers());
+    }
+
+    @Test
+    public void shouldReturnNullForNullAggregation() {
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(null, new RecordHeaders());
+        assertNull(aggregationWithHeaders);
+    }
+
+    @Test
+    public void shouldNotCreateWithNullHeaders() {
+        final Long aggregation = 100L;
+        assertThrows(NullPointerException.class, () -> 
AggregationWithHeaders.make(aggregation, null));
+    }
+
+    @Test
+    public void shouldAllowNullableAggregation() {
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.makeAllowNullable(null, new RecordHeaders());
+
+        assertNotNull(aggregationWithHeaders);
+        assertNull(aggregationWithHeaders.aggregation());
+    }
+
+    @Test
+    public void shouldGetAggregationOrNull() {
+        final Long aggregation = 100L;
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, new RecordHeaders());
+
+        assertEquals(aggregation, 
AggregationWithHeaders.getAggregationOrNull(aggregationWithHeaders));
+        assertNull(AggregationWithHeaders.getAggregationOrNull(null));
+    }
+
+    @Test
+    public void shouldImplementEquals() {
+        final Long aggregation = 100L;
+        final Headers headers1 = new RecordHeaders();
+        headers1.add("key1", "value1".getBytes());
+
+        final Headers headers2 = new RecordHeaders();
+        headers2.add("key1", "value1".getBytes());
+
+        final AggregationWithHeaders<Long> aggregationWithHeaders1 = 
AggregationWithHeaders.make(aggregation, headers1);
+        final AggregationWithHeaders<Long> aggregationWithHeaders2 = 
AggregationWithHeaders.make(aggregation, headers2);
+
+        assertEquals(aggregationWithHeaders1, aggregationWithHeaders2);
+        assertEquals(aggregationWithHeaders1.hashCode(), 
aggregationWithHeaders2.hashCode());
+    }
+
+    @Test
+    public void shouldNotBeEqualWithDifferentAggregations() {
+        final Headers headers = new RecordHeaders();
+
+        final AggregationWithHeaders<Long> aggregationWithHeaders1 = 
AggregationWithHeaders.make(100L, headers);
+        final AggregationWithHeaders<Long> aggregationWithHeaders2 = 
AggregationWithHeaders.make(200L, headers);
+
+        assertNotEquals(aggregationWithHeaders1, aggregationWithHeaders2);
+    }
+
+    @Test
+    public void shouldNotBeEqualWithDifferentHeaders() {
+        final Long aggregation = 100L;
+
+        final Headers headers1 = new RecordHeaders();
+        headers1.add("key1", "value1".getBytes());
+
+        final Headers headers2 = new RecordHeaders();
+        headers2.add("key2", "value2".getBytes());
+
+        final AggregationWithHeaders<Long> aggregationWithHeaders1 = 
AggregationWithHeaders.make(aggregation, headers1);
+        final AggregationWithHeaders<Long> aggregationWithHeaders2 = 
AggregationWithHeaders.make(aggregation, headers2);
+
+        assertNotEquals(aggregationWithHeaders1, aggregationWithHeaders2);
+    }
+
+    @Test
+    public void shouldImplementToString() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+        final String toString = aggregationWithHeaders.toString();
+
+        assertNotNull(toString);
+        assertTrue(toString.contains("aggregation=100"));
+        assertTrue(toString.contains("headers="));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
new file mode 100644
index 00000000000..d517f7387a2
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AggregationWithHeadersDeserializerTest {
+
+    private final Deserializer<Long> longDeserializer = 
Serdes.Long().deserializer();
+    private final AggregationWithHeadersDeserializer<Long> deserializer = new 
AggregationWithHeadersDeserializer<>(longDeserializer);
+
+    @Test
+    public void shouldDeserializeNullAsNull() {
+        final AggregationWithHeaders<Long> result = 
deserializer.deserialize("topic", null);
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldDeserializeAggregationWithEmptyHeaders() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+
+        final AggregationWithHeadersSerializer<Long> serializer = new 
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
+        final byte[] serialized = serializer.serialize("topic", 
aggregationWithHeaders);
+
+        final AggregationWithHeaders<Long> result = 
deserializer.deserialize("topic", serialized);
+
+        assertNotNull(result);
+        assertEquals(aggregation, result.aggregation());
+        assertNotNull(result.headers());
+    }
+
+    @Test
+    public void shouldDeserializeAggregationWithHeaders() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        headers.add("key2", "value2".getBytes());
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+
+        final AggregationWithHeadersSerializer<Long> serializer = new 
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
+        final byte[] serialized = serializer.serialize("topic", 
aggregationWithHeaders);
+
+        final AggregationWithHeaders<Long> result = 
deserializer.deserialize("topic", serialized);
+
+        assertNotNull(result);
+        assertEquals(aggregation, result.aggregation());
+        assertNotNull(result.headers());
+
+        final Iterator<Header> iterator = result.headers().iterator();
+        final Header header1 = iterator.next();
+        assertEquals("key1", header1.key());
+        assertArrayEquals("value1".getBytes(), header1.value());
+
+        final Header header2 = iterator.next();
+        assertEquals("key2", header2.key());
+        assertArrayEquals("value2".getBytes(), header2.value());
+    }
+
+    @Test
+    public void shouldThrowOnInvalidFormat() {
+        final byte[] invalidData = new byte[]{0x01, 0x02};
+        assertThrows(SerializationException.class, () -> 
deserializer.deserialize("topic", invalidData));
+    }
+
+    @Test
+    public void shouldExtractHeaders() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+
+        final AggregationWithHeadersSerializer<Long> serializer = new 
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
+        final byte[] serialized = serializer.serialize("topic", 
aggregationWithHeaders);
+
+        final Headers extractedHeaders = 
AggregationWithHeadersDeserializer.headers(serialized);
+        assertNotNull(extractedHeaders);
+
+        final Header header = extractedHeaders.iterator().next();
+        assertEquals("key1", header.key());
+        assertArrayEquals("value1".getBytes(), header.value());
+    }
+
+    @Test
+    public void shouldReturnNullForNullInput() {
+        assertNull(AggregationWithHeadersDeserializer.headers(null));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializerTest.java
new file mode 100644
index 00000000000..d7a61d0378d
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AggregationWithHeadersSerializerTest {
+
+    private final Serializer<Long> longSerializer = Serdes.Long().serializer();
+    private final AggregationWithHeadersSerializer<Long> serializer = new 
AggregationWithHeadersSerializer<>(longSerializer);
+
+    @Test
+    public void shouldSerializeNullAsNull() {
+        final byte[] result = serializer.serialize("topic", null);
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldSerializeAggregationWithEmptyHeaders() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+
+        final byte[] result = serializer.serialize("topic", 
aggregationWithHeaders);
+
+        assertNotNull(result);
+        assertTrue(result.length > 0);
+    }
+
+    @Test
+    public void shouldSerializeAggregationWithHeaders() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        headers.add("key2", "value2".getBytes());
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+
+        final byte[] result = serializer.serialize("topic", 
aggregationWithHeaders);
+
+        assertNotNull(result);
+        assertTrue(result.length > 0);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeConsistently() {
+        final Long aggregation = 100L;
+        final Headers headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        headers.add("key2", null);
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
+
+        final byte[] serialized = serializer.serialize("topic", 
aggregationWithHeaders);
+        final AggregationWithHeadersDeserializer<Long> deserializer = new 
AggregationWithHeadersDeserializer<>(Serdes.Long().deserializer());
+        final AggregationWithHeaders<Long> deserialized = 
deserializer.deserialize("topic", serialized);
+
+        assertNotNull(deserialized);
+        assertEquals(aggregation, deserialized.aggregation());
+        assertNotNull(deserialized.headers());
+    }
+
+    @Test
+    public void shouldHandleNullAggregationInAggregationWithHeaders() {
+        final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.makeAllowNullable(null, new RecordHeaders());
+        final byte[] result = serializer.serialize("topic", 
aggregationWithHeaders);
+
+        assertNull(result);
+    }
+}
\ No newline at end of file

Reply via email to