This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1515fddef07 MINOR: Optimize empty headers serialization format (#21419)
1515fddef07 is described below

commit 1515fddef07598ea56508fb82d2fb3546a52f831
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 6 18:21:18 2026 +0000

    MINOR: Optimize empty headers serialization format (#21419)
    
    This is a follow-up of #21401. Optimize `HeadersSerializer` to use 0
    bytes for empty headers instead of 1 byte (`headerCount=0`).
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../streams/state/internals/HeadersSerializer.java  | 21 ++++++++++++++++-----
 .../state/internals/HeadersSerializerTest.java      |  6 ++----
 2 files changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index 94772ce700c..91a97e78e4b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -30,7 +30,10 @@ import java.nio.charset.StandardCharsets;
  * Serializer for Kafka Headers.
  * <p>
  * Serialization format (per KIP-1271):
- * [NumHeaders(varint)][Header1][Header2]...
+ * <ul>
+ * <li>For null or empty headers: empty byte array (0 bytes)</li>
+ * <li>For non-empty headers: [NumHeaders(varint)][Header1][Header2]...</li>
+ * </ul>
  * <p>
  * Each header:
  * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
@@ -50,20 +53,28 @@ public class HeadersSerializer implements 
Serializer<Headers> {
      * <p>
      * The output format is [count][header1][header2]... without a size prefix.
      * The size prefix is added by the outer serializer that uses this.
+     * <p>
+     * For null or empty headers, returns an empty byte array (0 bytes)
+     * instead of encoding headerCount=0 (1 byte).
      *
      * @param topic topic associated with data
      * @param headers the headers to serialize (can be null)
-     * @return the serialized byte array
+     * @return the serialized byte array (empty array if headers are null or 
empty)
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers) {
+        final Header[] headersArray = (headers == null) ? new Header[0] : 
headers.toArray();
+
+        if (headersArray.length == 0) {
+            return new byte[0];
+        }
+
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
 
-            final Header[] headerArray = (headers == null) ? new Header[0] : 
headers.toArray();
-            ByteUtils.writeVarint(headerArray.length, out);
+            ByteUtils.writeVarint(headersArray.length, out);
 
-            for (final Header header : headerArray) {
+            for (final Header header : headersArray) {
                 final byte[] keyBytes = 
header.key().getBytes(StandardCharsets.UTF_8);
                 final byte[] valueBytes = header.value();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index c6e59208720..dc0afdec7b0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -38,8 +38,7 @@ public class HeadersSerializerTest {
         final byte[] serialized = serializer.serialize("", null);
 
         assertNotNull(serialized);
-        assertEquals(1, serialized.length, "Null headers should have 1 byte to 
indicate headers count is 0");
-        assertEquals(0, serialized[0], "The byte should be 0 (varint encoding 
of 0)");
+        assertEquals(0, serialized.length, "Null headers should serialize to 
empty byte array (0 bytes)");
     }
 
     @Test
@@ -48,8 +47,7 @@ public class HeadersSerializerTest {
         final byte[] serialized = serializer.serialize("", headers);
 
         assertNotNull(serialized);
-        assertEquals(1, serialized.length, "Empty headers should have 1 byte 
to indicate headers count is 0");
-        assertEquals(0, serialized[0], "The byte should be 0 (varint encoding 
of 0)");
+        assertEquals(0, serialized.length, "Empty headers should serialize to 
empty byte array (0 bytes)");
     }
 
     @Test

Reply via email to