This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9a5f9653f25 KAFKA-20122: Create HeadersBytesStore (#21409)
9a5f9653f25 is described below
commit 9a5f9653f25387eb814066bb94d5b78943fce10f
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 11 08:16:54 2026 +0000
KAFKA-20122: Create HeadersBytesStore (#21409)
This PR creates HeadersBytesStore as infrastructure of KIP-1271.
Reviewers: Alieh Saeedi <[email protected]>, Genseric Ghiro
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../kafka/streams/state/HeadersBytesStore.java | 61 ++++++++++++++++++++
.../kafka/streams/state/HeadersBytesStoreTest.java | 67 ++++++++++++++++++++++
2 files changed, 128 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
new file mode 100644
index 00000000000..b69875b755b
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Marker interface to indicate that a bytes store understands the
value-with-headers format
+ * and can convert legacy timestamped value entries (ValueAndTimestamp format)
to the new format.
+ * <p>
+ * This is analogous to {@link TimestampedBytesStore} but for header-aware
stores.
+ * <p>
+ * Per KIP-1271, the value format is:
[headersSize(varint)][headersBytes][payloadBytes]
+ * where payloadBytes is the existing serialized value (e.g.,
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+ /**
+ * Converts a legacy timestamped value (ValueAndTimestamp format, without
headers) to the header-embedded format.
+ * <p>
+ * For timestamped stores, the legacy format is: [timestamp(8)][value]
+ * The new format is:
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ * <p>
+ * This method adds empty headers to the existing timestamped value format.
+ * <p>
+ * Empty headers are represented as 0 bytes (headersSize=0, no
headersBytes),
+ *
+ * @param plainValue the legacy timestamped value bytes
+ * @return the value in header-embedded format with empty headers
+ */
+ static byte[] convertToHeaderFormat(final byte[] plainValue) {
+ if (plainValue == null) {
+ return null;
+ }
+
+ // Format: [headersSize(varint)][headersBytes][payload]
+ // For empty headers:
+ // headersSize = varint(0) = [0x00]
+ // headersBytes = [] (empty, 0 bytes)
+ // Result: [0x00][payload]
+ return ByteBuffer
+ .allocate(1 + plainValue.length)
+ .put((byte) 0x00)
+ .put(plainValue)
+ .array();
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
new file mode 100644
index 00000000000..e71d05f792c
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HeadersBytesStoreTest {
+
+ @Test
+ public void shouldReturnNullWhenConvertingNullValue() {
+ final byte[] result = HeadersBytesStore.convertToHeaderFormat(null);
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldConvertLegacyValueToHeaderFormat() {
+ final byte[] legacyValue = "test-value".getBytes();
+
+ final byte[] converted =
HeadersBytesStore.convertToHeaderFormat(legacyValue);
+
+ assertNotNull(converted);
+ assertEquals(legacyValue.length + 1, converted.length, "converted
bytes should have empty header bytes");
+ assertEquals(0x00, converted[0], "First byte for empty header should
be the 0x00");
+ byte[] actualPayload = Arrays.copyOfRange(converted, 1,
converted.length);
+ assertArrayEquals(legacyValue, actualPayload);
+ }
+
+ @Test
+ public void shouldConvertEmptyValueToHeaderFormat() {
+ final byte[] emptyValue = new byte[0];
+
+ final byte[] converted =
HeadersBytesStore.convertToHeaderFormat(emptyValue);
+
+ assertNotNull(converted);
+ assertTrue(converted.length > 0, "Converted value should have headers
metadata");
+
+ final ByteBuffer buffer = ByteBuffer.wrap(converted);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ assertEquals(0, headersSize, "Empty headers should have headersSize =
0");
+ assertEquals(0, buffer.remaining(), "No payload bytes for empty
value");
+ }
+}