This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4228890ecfe MINOR: follow-up of KAFKA-20317 (#22171)
4228890ecfe is described below

commit 4228890ecfe113789f73b0405dfdace09b2d7f4a
Author: TengYao Chi <[email protected]>
AuthorDate: Tue May 12 16:42:28 2026 +0100

    MINOR: follow-up of KAFKA-20317 (#22171)
    
    ref:
    https://github.com/apache/kafka/pull/21780#pullrequestreview-3969893622
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../TimeOrderedWindowStoreUpgradeTest.java         | 94 +++-------------------
 1 file changed, 13 insertions(+), 81 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
index 12829a3b995..3493a84a283 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
@@ -35,7 +35,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Properties;
 
@@ -104,39 +103,6 @@ public class TimeOrderedWindowStoreUpgradeTest {
         }
     }
 
-    /**
-     * Helper method to extract raw value from [headers][value] format.
-     * This is used for window stores with headers where the format is:
-     * [headersSize(varint)][headersBytes][value]
-     */
-    private static byte[] extractRawValue(final byte[] valueWithHeaders) {
-        if (valueWithHeaders == null) {
-            return null;
-        }
-        final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
-        final int headersSize = ByteUtils.readVarint(buffer);
-        buffer.position(buffer.position() + headersSize);
-        final byte[] rawValue = new byte[buffer.remaining()];
-        buffer.get(rawValue);
-        return rawValue;
-    }
-
-    /**
-     * Helper method to extract headers from [headers][value] format.
-     * This is used for window stores with headers where the format is:
-     * [headersSize(varint)][headersBytes][value]
-     */
-    private static Headers extractHeaders(final byte[] valueWithHeaders) {
-        if (valueWithHeaders == null) {
-            return null;
-        }
-        final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
-        final int headersSize = ByteUtils.readVarint(buffer);
-        final byte[] headerBytes = new byte[headersSize];
-        buffer.get(headerBytes);
-        return HeadersDeserializer.deserialize(headerBytes);
-    }
-
     @Test
     public void shouldMigrateFromWithoutHeadersToWithHeaders() {
         final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
@@ -146,8 +112,8 @@ public class TimeOrderedWindowStoreUpgradeTest {
                 SEGMENT_INTERVAL_MS,
                 WINDOW_SIZE_MS,
                 false,
-                true,
-                false   // withHeaders = FALSE (old format)
+                false,
+                false
             );
 
         final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
@@ -186,14 +152,14 @@ public class TimeOrderedWindowStoreUpgradeTest {
 
         // Verify we can read old data (lazy migration)
         byte[] fetch = newStore.fetch(key1, baseTime + 100);
-        assertEquals("value1", new String(extractRawValue(fetch)));
-        assertEquals(0, extractHeaders(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        assertEquals("value1", new String(Utils.rawAggregation(fetch)));
+        assertEquals(0, Utils.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
         fetch = newStore.fetch(key2, baseTime + 200);
-        assertEquals("value2", new String(extractRawValue(fetch)));
-        assertEquals(0, extractHeaders(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        assertEquals("value2", new String(Utils.rawAggregation(fetch)));
+        assertEquals(0, Utils.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
         fetch = newStore.fetch(key3, baseTime + 300);
-        assertEquals("value3", new String(extractRawValue(fetch)));
-        assertEquals(0, extractHeaders(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        assertEquals("value3", new String(Utils.rawAggregation(fetch)));
+        assertEquals(0, Utils.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
 
 
         // Write new data (should use headers-CF)
@@ -209,17 +175,17 @@ public class TimeOrderedWindowStoreUpgradeTest {
         newStore.put(key4, serializeValueWithHeaders("value4".getBytes(), 
headersWithData), baseTime + 400);
 
         // Verify new data - raw values
-        assertEquals("value3-updated", new 
String(extractRawValue(newStore.fetch(key3, baseTime + 350))));
-        assertEquals("value4", new String(extractRawValue(newStore.fetch(key4, 
baseTime + 400))));
+        assertEquals("value3-updated", new 
String(Utils.rawAggregation(newStore.fetch(key3, baseTime + 350))));
+        assertEquals("value4", new 
String(Utils.rawAggregation(newStore.fetch(key4, baseTime + 400))));
 
         // Verify headers for key3 (empty headers)
         final byte[] fetchedKey3Value = newStore.fetch(key3, baseTime + 350);
-        final Headers retrievedKey3Headers = extractHeaders(fetchedKey3Value);
+        final Headers retrievedKey3Headers = Utils.headers(fetchedKey3Value);
         assertEquals(0, retrievedKey3Headers.toArray().length);
 
         // Verify headers for key4 (headers with data)
         final byte[] fetchedKey4Value = newStore.fetch(key4, baseTime + 400);
-        final Headers retrievedKey4Headers = extractHeaders(fetchedKey4Value);
+        final Headers retrievedKey4Headers = Utils.headers(fetchedKey4Value);
         assertEquals(2, retrievedKey4Headers.toArray().length);
         assertEquals("header-value-1", new 
String(retrievedKey4Headers.lastHeader("header-key-1").value()));
         assertEquals("header-value-2", new 
String(retrievedKey4Headers.lastHeader("header-key-2").value()));
@@ -256,41 +222,7 @@ public class TimeOrderedWindowStoreUpgradeTest {
         newStore.init(context, newStore);
 
         // Verify old data still accessible
-        assertEquals("value1", new String(extractRawValue(newStore.fetch(key1, 
baseTime + 100))));
-
-        newStore.close();
-    }
-
-    @Test
-    public void shouldMigrateFromWithoutIndexToWithIndexAndHeaders() {
-        // Test: withIndex=false, withHeaders=false → withIndex=true, 
withHeaders=true
-        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
-            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
-                STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
-                false, false, false  // withIndex=false, withHeaders=false
-            );
-
-        final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
-        oldStore.init(context, oldStore);
-
-        final long baseTime = System.currentTimeMillis();
-        final Bytes key1 = Bytes.wrap("key1".getBytes());
-
-        oldStore.put(key1, "value1".getBytes(), baseTime + 100);
-        oldStore.close();
-
-        // Upgrade to both index and headers
-        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
-            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
-                STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
-                false, true, true  // withIndex=true, withHeaders=true
-            );
-
-        final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
-        newStore.init(context, newStore);
-
-        // Verify old data still accessible
-        assertEquals("value1", new String(extractRawValue(newStore.fetch(key1, 
baseTime + 100))));
+        assertEquals("value1", new 
String(Utils.rawAggregation(newStore.fetch(key1, baseTime + 100))));
 
         newStore.close();
     }

Reply via email to