This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a48123e2a33 MINOR: Simplify Header Serdes (#21514)
a48123e2a33 is described below

commit a48123e2a334702c30487a3d87025d807f9ade9f
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Feb 19 22:36:20 2026 -0800

    MINOR: Simplify Header Serdes (#21514)
    
    HeadersSerializer and HeaderDeserializer are internal classes and there
    is no reason why they would need to implement the public Serializer and
    Deserializer interfaces. If we don't implement these interfaces, the
    actual serialize() and deserialize() methods can be static simplifying
    the code further.
    
    Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
    <[email protected]>
---
 .../state/internals/HeadersDeserializer.java       | 12 ++-------
 .../streams/state/internals/HeadersSerializer.java |  7 ++---
 .../ValueTimestampHeadersDeserializer.java         |  9 ++-----
 .../internals/ValueTimestampHeadersSerializer.java |  8 ++----
 .../state/internals/HeadersDeserializerTest.java   | 31 ++++++++++------------
 .../state/internals/HeadersSerializerTest.java     | 23 +++++++---------
 6 files changed, 32 insertions(+), 58 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
index 88ca8848bdc..e5baefb9cfc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.ByteUtils;
 
 import java.nio.ByteBuffer;
@@ -41,18 +40,17 @@ import java.nio.charset.StandardCharsets;
  *
  * This is used by KIP-1271 to deserialize headers from state stores.
  */
-public class HeadersDeserializer implements Deserializer<Headers> {
+class HeadersDeserializer {
 
     /**
      * Deserializes headers from a byte array using varint encoding per 
KIP-1271.
      * <p>
      * The input format is [count][header1][header2]... without a size prefix.
      *
-     * @param topic topic associated with the data
      * @param data the serialized byte array (can be null)
      * @return the deserialized headers
      */
-    public Headers deserialize(final String topic, final byte[] data) {
+    public static Headers deserialize(final byte[] data) {
         if (data == null || data.length == 0) {
             return new RecordHeaders();
         }
@@ -86,10 +84,4 @@ public class HeadersDeserializer implements 
Deserializer<Headers> {
 
         return headers;
     }
-
-    public static Headers deserialize(final byte[] data) {
-        try (HeadersDeserializer deserializer = new HeadersDeserializer()) {
-            return deserializer.deserialize("", data);
-        }
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index a71048bb9b9..f8c664c7820 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.ByteUtils;
 
 import java.io.ByteArrayOutputStream;
@@ -47,7 +46,7 @@ import java.nio.charset.StandardCharsets;
  * <p>
  * This is used by KIP-1271 to serialize headers for storage in state stores.
  */
-public class HeadersSerializer implements Serializer<Headers> {
+class HeadersSerializer {
 
     /**
      * Serializes headers into a byte array using varint encoding per KIP-1271.
@@ -58,12 +57,10 @@ public class HeadersSerializer implements 
Serializer<Headers> {
      * For null or empty headers, returns an empty byte array (0 bytes)
      * instead of encoding headerCount=0 (1 byte).
      *
-     * @param topic topic associated with data
      * @param headers the headers to serialize (can be null)
      * @return the serialized byte array (empty array if headers are null or 
empty)
      */
-    @Override
-    public byte[] serialize(final String topic, final Headers headers) {
+    public static byte[] serialize(final Headers headers) {
         final Header[] headersArray = (headers == null) ? new Header[0] : 
headers.toArray();
 
         if (headersArray.length == 0) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
index c268e6f0276..8dcbdcde25d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -49,24 +49,20 @@ import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.i
  */
 class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
     private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
-    private static final HeadersDeserializer HEADERS_DESERIALIZER = new 
HeadersDeserializer();
 
     public final Deserializer<V> valueDeserializer;
     private final LongDeserializer timestampDeserializer;
-    private final HeadersDeserializer headersDeserializer;
 
     ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) 
{
         Objects.requireNonNull(valueDeserializer);
         this.valueDeserializer = valueDeserializer;
         this.timestampDeserializer = new LongDeserializer();
-        this.headersDeserializer = new HeadersDeserializer();
     }
 
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
         valueDeserializer.configure(configs, isKey);
         timestampDeserializer.configure(configs, isKey);
-        headersDeserializer.configure(configs, isKey);
     }
 
     @Override
@@ -79,7 +75,7 @@ class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializ
         final int headersSize = ByteUtils.readVarint(buffer);
 
         final byte[] rawHeaders = readBytes(buffer, headersSize);
-        final Headers headers = headersDeserializer.deserialize(topic, 
rawHeaders);
+        final Headers headers = HeadersDeserializer.deserialize(rawHeaders);
         final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
         final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp);
         final byte[] rawValue = readBytes(buffer, buffer.remaining());
@@ -92,7 +88,6 @@ class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializ
     public void close() {
         valueDeserializer.close();
         timestampDeserializer.close();
-        headersDeserializer.close();
     }
 
     @Override
@@ -162,7 +157,7 @@ class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializ
         final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
         final int headersSize = ByteUtils.readVarint(buffer);
         final byte[] rawHeaders = readBytes(buffer, headersSize);
-        return HEADERS_DESERIALIZER.deserialize("", rawHeaders);
+        return HeadersDeserializer.deserialize(rawHeaders);
     }
     /**
      * Extract raw value from serialized ValueTimestampHeaders.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
index 2686e605098..1da1ad1f65f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
@@ -49,23 +49,20 @@ import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.i
  *
  * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
  */
-public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
     public final Serializer<V> valueSerializer;
     private final LongSerializer timestampSerializer;
-    private final HeadersSerializer headersSerializer;
 
     ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
         Objects.requireNonNull(valueSerializer);
         this.valueSerializer = valueSerializer;
         this.timestampSerializer = new LongSerializer();
-        this.headersSerializer = new HeadersSerializer();
     }
 
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
         valueSerializer.configure(configs, isKey);
         timestampSerializer.configure(configs, isKey);
-        headersSerializer.configure(configs, isKey);
     }
 
     @Override
@@ -95,7 +92,7 @@ public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSeria
         final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
 
         // empty (byte[0]) for null/empty headers, or 
[count][header1][header2]... for non-empty
-        final byte[] rawHeaders = headersSerializer.serialize(topic, headers);
+        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
 
         // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -116,7 +113,6 @@ public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSeria
     public void close() {
         valueSerializer.close();
         timestampSerializer.close();
-        headersSerializer.close();
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
index b07a2756d4c..8f6e1ce3d7a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
@@ -31,12 +31,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class HeadersDeserializerTest {
 
-    private final HeadersSerializer serializer = new HeadersSerializer();
-    private final HeadersDeserializer deserializer = new HeadersDeserializer();
-
     @Test
     public void shouldDeserializeNullData() {
-        final Headers headers = deserializer.deserialize("", null);
+        final Headers headers = HeadersDeserializer.deserialize(null);
 
         assertNotNull(headers);
         assertEquals(0, headers.toArray().length);
@@ -44,7 +41,7 @@ public class HeadersDeserializerTest {
 
     @Test
     public void shouldDeserializeEmptyData() {
-        final Headers headers = deserializer.deserialize("", new byte[0]);
+        final Headers headers = HeadersDeserializer.deserialize(new byte[0]);
 
         assertNotNull(headers);
         assertEquals(0, headers.toArray().length);
@@ -53,8 +50,8 @@ public class HeadersDeserializerTest {
     @Test
     public void shouldRoundTripEmptyHeaders() {
         final Headers original = new RecordHeaders();
-        final byte[] serialized = serializer.serialize("", original);
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final byte[] serialized = HeadersSerializer.serialize(original);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
         assertEquals(0, deserialized.toArray().length);
@@ -64,8 +61,8 @@ public class HeadersDeserializerTest {
     public void shouldRoundTripSingleHeader() {
         final Headers original = new RecordHeaders()
             .add("key1", "value1".getBytes());
-        final byte[] serialized = serializer.serialize("", original);
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final byte[] serialized = HeadersSerializer.serialize(original);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
         assertEquals(1, deserialized.toArray().length);
@@ -82,8 +79,8 @@ public class HeadersDeserializerTest {
             .add("key0", "value0".getBytes())
             .add("key1", "value1".getBytes())
             .add("key2", "value2".getBytes());
-        final byte[] serialized = serializer.serialize("", original);
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final byte[] serialized = HeadersSerializer.serialize(original);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
         assertNotNull(deserialized);
 
         final Header[] headerArray = deserialized.toArray();
@@ -99,8 +96,8 @@ public class HeadersDeserializerTest {
     public void shouldRoundTripHeaderWithNullValue() {
         final Headers original = new RecordHeaders()
             .add("key1", null);
-        final byte[] serialized = serializer.serialize("", original);
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final byte[] serialized = HeadersSerializer.serialize(original);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
         assertEquals(1, deserialized.toArray().length);
@@ -115,8 +112,8 @@ public class HeadersDeserializerTest {
     public void shouldRoundTripHeaderWithEmptyValue() {
         final Headers original = new RecordHeaders()
             .add("key1", new byte[0]);
-        final byte[] serialized = serializer.serialize("", original);
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final byte[] serialized = HeadersSerializer.serialize(original);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
         assertEquals(1, deserialized.toArray().length);
@@ -135,8 +132,8 @@ public class HeadersDeserializerTest {
             .add("key1", "value1".getBytes())
             .add("key2", "value2".getBytes())
             .add("key2", "value3".getBytes());
-        final byte[] serialized = serializer.serialize("", original);
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final byte[] serialized = HeadersSerializer.serialize(original);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
         assertNotNull(deserialized);
 
         final Header[] headerArray = deserialized.toArray();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index bb2ab983475..7c791c93984 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -30,12 +30,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class HeadersSerializerTest {
 
-    private final HeadersSerializer serializer = new HeadersSerializer();
-    private final HeadersDeserializer deserializer = new HeadersDeserializer();
-
     @Test
     public void shouldSerializeNullHeaders() {
-        final byte[] serialized = serializer.serialize("", null);
+        final byte[] serialized = HeadersSerializer.serialize(null);
 
         assertNotNull(serialized);
         assertEquals(0, serialized.length, "Null headers should serialize to 
empty byte array (0 bytes)");
@@ -44,7 +41,7 @@ public class HeadersSerializerTest {
     @Test
     public void shouldSerializeEmptyHeaders() {
         final Headers headers = new RecordHeaders();
-        final byte[] serialized = serializer.serialize("", headers);
+        final byte[] serialized = HeadersSerializer.serialize(headers);
 
         assertNotNull(serialized);
         assertEquals(0, serialized.length, "Empty headers should serialize to 
empty byte array (0 bytes)");
@@ -54,12 +51,12 @@ public class HeadersSerializerTest {
     public void shouldSerializeSingleHeader() {
         final Headers headers = new RecordHeaders()
             .add("key1", "value1".getBytes());
-        final byte[] serialized = serializer.serialize("", headers);
+        final byte[] serialized = HeadersSerializer.serialize(headers);
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
 
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
         assertNotNull(deserialized);
         assertEquals(1, deserialized.toArray().length);
 
@@ -75,12 +72,12 @@ public class HeadersSerializerTest {
             .add("key0", "value0".getBytes())
             .add("key1", "value1".getBytes())
             .add("key2", "value2".getBytes());
-        final byte[] serialized = serializer.serialize("", headers);
+        final byte[] serialized = HeadersSerializer.serialize(headers);
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
 
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
         assertNotNull(deserialized);
         assertEquals(3, deserialized.toArray().length);
 
@@ -96,12 +93,12 @@ public class HeadersSerializerTest {
     public void shouldSerializeHeaderWithNullValue() {
         final Headers headers = new RecordHeaders()
             .add("key1", null);
-        final byte[] serialized = serializer.serialize("", headers);
+        final byte[] serialized = HeadersSerializer.serialize(headers);
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
 
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
         assertNotNull(deserialized);
         assertEquals(1, deserialized.toArray().length);
 
@@ -115,12 +112,12 @@ public class HeadersSerializerTest {
     public void shouldSerializeHeadersWithEmptyValue() {
         final Headers headers = new RecordHeaders()
             .add("key1", new byte[0]);
-        final byte[] serialized = serializer.serialize("", headers);
+        final byte[] serialized = HeadersSerializer.serialize(headers);
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
 
-        final Headers deserialized = deserializer.deserialize("", serialized);
+        final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
         assertNotNull(deserialized);
         assertEquals(1, deserialized.toArray().length);
 

Reply via email to