This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a5f9653f25 KAFKA-20122: Create HeadersBytesStore (#21409)
9a5f9653f25 is described below

commit 9a5f9653f25387eb814066bb94d5b78943fce10f
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 11 08:16:54 2026 +0000

    KAFKA-20122: Create HeadersBytesStore (#21409)
    
    This PR creates HeadersBytesStore as infrastructure of KIP-1271.
    
    Reviewers: Alieh Saeedi <[email protected]>, Genseric Ghiro
     <[email protected]>, Matthias J. Sax <[email protected]>
---
 .../kafka/streams/state/HeadersBytesStore.java     | 61 ++++++++++++++++++++
 .../kafka/streams/state/HeadersBytesStoreTest.java | 67 ++++++++++++++++++++++
 2 files changed, 128 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
new file mode 100644
index 00000000000..b69875b755b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy timestamped value entries (ValueAndTimestamp format) 
to the new format.
+ * <p>
+ * This is analogous to {@link TimestampedBytesStore} but for header-aware 
stores.
+ * <p>
+ * Per KIP-1271, the value format is: 
[headersSize(varint)][headersBytes][payloadBytes]
+ * where payloadBytes is the existing serialized value (e.g., 
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+    /**
+     * Converts a legacy timestamped value (ValueAndTimestamp format, without 
headers) to the header-embedded format.
+     * <p>
+     * For timestamped stores, the legacy format is: [timestamp(8)][value]
+     * The new format is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+     * <p>
+     * This method adds empty headers to the existing timestamped value format.
+     * <p>
+     * Empty headers are represented as 0 bytes (headersSize=0, no 
headersBytes),
+     *
+     * @param plainValue the legacy timestamped value bytes
+     * @return the value in header-embedded format with empty headers
+     */
+    static byte[] convertToHeaderFormat(final byte[] plainValue) {
+        if (plainValue == null) {
+            return null;
+        }
+
+        // Format: [headersSize(varint)][headersBytes][payload]
+        // For empty headers:
+        //   headersSize = varint(0) = [0x00]
+        //   headersBytes = [] (empty, 0 bytes)
+        // Result: [0x00][payload]
+        return ByteBuffer
+            .allocate(1 + plainValue.length)
+            .put((byte) 0x00)
+            .put(plainValue)
+            .array();
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
new file mode 100644
index 00000000000..e71d05f792c
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HeadersBytesStoreTest {
+
+    @Test
+    public void shouldReturnNullWhenConvertingNullValue() {
+        final byte[] result = HeadersBytesStore.convertToHeaderFormat(null);
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldConvertLegacyValueToHeaderFormat() {
+        final byte[] legacyValue = "test-value".getBytes();
+
+        final byte[] converted = 
HeadersBytesStore.convertToHeaderFormat(legacyValue);
+
+        assertNotNull(converted);
+        assertEquals(legacyValue.length + 1, converted.length, "converted 
bytes should have empty header bytes");
+        assertEquals(0x00, converted[0], "First byte for empty header should 
be the 0x00");
+        byte[] actualPayload = Arrays.copyOfRange(converted, 1, 
converted.length);
+        assertArrayEquals(legacyValue, actualPayload);
+    }
+
+    @Test
+    public void shouldConvertEmptyValueToHeaderFormat() {
+        final byte[] emptyValue = new byte[0];
+
+        final byte[] converted = 
HeadersBytesStore.convertToHeaderFormat(emptyValue);
+
+        assertNotNull(converted);
+        assertTrue(converted.length > 0, "Converted value should have headers 
metadata");
+
+        final ByteBuffer buffer = ByteBuffer.wrap(converted);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        assertEquals(0, headersSize, "Empty headers should have headersSize = 
0");
+        assertEquals(0, buffer.remaining(), "No payload bytes for empty 
value");
+    }
+}

Reply via email to