This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1515fddef07 MINOR: Optimize empty headers serialization format (#21419)
1515fddef07 is described below
commit 1515fddef07598ea56508fb82d2fb3546a52f831
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 6 18:21:18 2026 +0000
MINOR: Optimize empty headers serialization format (#21419)
This is a follow-up of #21401. Optimize `HeadersSerializer` to use 0
bytes for empty headers instead of 1 byte (`headerCount=0`).
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/state/internals/HeadersSerializer.java | 21 ++++++++++++++++-----
.../state/internals/HeadersSerializerTest.java | 6 ++----
2 files changed, 18 insertions(+), 9 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index 94772ce700c..91a97e78e4b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -30,7 +30,10 @@ import java.nio.charset.StandardCharsets;
* Serializer for Kafka Headers.
* <p>
* Serialization format (per KIP-1271):
- * [NumHeaders(varint)][Header1][Header2]...
+ * <ul>
+ * <li>For null or empty headers: empty byte array (0 bytes)</li>
+ * <li>For non-empty headers: [NumHeaders(varint)][Header1][Header2]...</li>
+ * </ul>
* <p>
* Each header:
* [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
@@ -50,20 +53,28 @@ public class HeadersSerializer implements
Serializer<Headers> {
* <p>
* The output format is [count][header1][header2]... without a size prefix.
* The size prefix is added by the outer serializer that uses this.
+ * <p>
+ * For null or empty headers, returns an empty byte array (0 bytes)
+ * instead of encoding headerCount=0 (1 byte).
*
* @param topic topic associated with data
* @param headers the headers to serialize (can be null)
- * @return the serialized byte array
+ * @return the serialized byte array (empty array if headers are null or
empty)
*/
@Override
public byte[] serialize(final String topic, final Headers headers) {
+ final Header[] headersArray = (headers == null) ? new Header[0] :
headers.toArray();
+
+ if (headersArray.length == 0) {
+ return new byte[0];
+ }
+
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
- final Header[] headerArray = (headers == null) ? new Header[0] :
headers.toArray();
- ByteUtils.writeVarint(headerArray.length, out);
+ ByteUtils.writeVarint(headersArray.length, out);
- for (final Header header : headerArray) {
+ for (final Header header : headersArray) {
final byte[] keyBytes =
header.key().getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = header.value();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index c6e59208720..dc0afdec7b0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -38,8 +38,7 @@ public class HeadersSerializerTest {
final byte[] serialized = serializer.serialize("", null);
assertNotNull(serialized);
- assertEquals(1, serialized.length, "Null headers should have 1 byte to
indicate headers count is 0");
- assertEquals(0, serialized[0], "The byte should be 0 (varint encoding
of 0)");
+ assertEquals(0, serialized.length, "Null headers should serialize to
empty byte array (0 bytes)");
}
@Test
@@ -48,8 +47,7 @@ public class HeadersSerializerTest {
final byte[] serialized = serializer.serialize("", headers);
assertNotNull(serialized);
- assertEquals(1, serialized.length, "Empty headers should have 1 byte
to indicate headers count is 0");
- assertEquals(0, serialized[0], "The byte should be 0 (varint encoding
of 0)");
+ assertEquals(0, serialized.length, "Empty headers should serialize to
empty byte array (0 bytes)");
}
@Test