This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4228890ecfe MINOR: follow-up of KAFKA-20317 (#22171)
4228890ecfe is described below
commit 4228890ecfe113789f73b0405dfdace09b2d7f4a
Author: TengYao Chi <[email protected]>
AuthorDate: Tue May 12 16:42:28 2026 +0100
MINOR: follow-up of KAFKA-20317 (#22171)
ref:
https://github.com/apache/kafka/pull/21780#pullrequestreview-3969893622
Reviewers: Matthias J. Sax <[email protected]>
---
.../TimeOrderedWindowStoreUpgradeTest.java | 94 +++-------------------
1 file changed, 13 insertions(+), 81 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
index 12829a3b995..3493a84a283 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
@@ -35,7 +35,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Properties;
@@ -104,39 +103,6 @@ public class TimeOrderedWindowStoreUpgradeTest {
}
}
- /**
- * Helper method to extract raw value from [headers][value] format.
- * This is used for window stores with headers where the format is:
- * [headersSize(varint)][headersBytes][value]
- */
- private static byte[] extractRawValue(final byte[] valueWithHeaders) {
- if (valueWithHeaders == null) {
- return null;
- }
- final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- buffer.position(buffer.position() + headersSize);
- final byte[] rawValue = new byte[buffer.remaining()];
- buffer.get(rawValue);
- return rawValue;
- }
-
- /**
- * Helper method to extract headers from [headers][value] format.
- * This is used for window stores with headers where the format is:
- * [headersSize(varint)][headersBytes][value]
- */
- private static Headers extractHeaders(final byte[] valueWithHeaders) {
- if (valueWithHeaders == null) {
- return null;
- }
- final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- final byte[] headerBytes = new byte[headersSize];
- buffer.get(headerBytes);
- return HeadersDeserializer.deserialize(headerBytes);
- }
-
@Test
public void shouldMigrateFromWithoutHeadersToWithHeaders() {
final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
@@ -146,8 +112,8 @@ public class TimeOrderedWindowStoreUpgradeTest {
SEGMENT_INTERVAL_MS,
WINDOW_SIZE_MS,
false,
- true,
- false // withHeaders = FALSE (old format)
+ false,
+ false
);
final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
@@ -186,14 +152,14 @@ public class TimeOrderedWindowStoreUpgradeTest {
// Verify we can read old data (lazy migration)
byte[] fetch = newStore.fetch(key1, baseTime + 100);
- assertEquals("value1", new String(extractRawValue(fetch)));
- assertEquals(0, extractHeaders(fetch).toArray().length, "Old data
should have empty headers after migration");
+ assertEquals("value1", new String(Utils.rawAggregation(fetch)));
+ assertEquals(0, Utils.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
fetch = newStore.fetch(key2, baseTime + 200);
- assertEquals("value2", new String(extractRawValue(fetch)));
- assertEquals(0, extractHeaders(fetch).toArray().length, "Old data
should have empty headers after migration");
+ assertEquals("value2", new String(Utils.rawAggregation(fetch)));
+ assertEquals(0, Utils.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
fetch = newStore.fetch(key3, baseTime + 300);
- assertEquals("value3", new String(extractRawValue(fetch)));
- assertEquals(0, extractHeaders(fetch).toArray().length, "Old data
should have empty headers after migration");
+ assertEquals("value3", new String(Utils.rawAggregation(fetch)));
+ assertEquals(0, Utils.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
// Write new data (should use headers-CF)
@@ -209,17 +175,17 @@ public class TimeOrderedWindowStoreUpgradeTest {
newStore.put(key4, serializeValueWithHeaders("value4".getBytes(),
headersWithData), baseTime + 400);
// Verify new data - raw values
- assertEquals("value3-updated", new
String(extractRawValue(newStore.fetch(key3, baseTime + 350))));
- assertEquals("value4", new String(extractRawValue(newStore.fetch(key4,
baseTime + 400))));
+ assertEquals("value3-updated", new
String(Utils.rawAggregation(newStore.fetch(key3, baseTime + 350))));
+ assertEquals("value4", new
String(Utils.rawAggregation(newStore.fetch(key4, baseTime + 400))));
// Verify headers for key3 (empty headers)
final byte[] fetchedKey3Value = newStore.fetch(key3, baseTime + 350);
- final Headers retrievedKey3Headers = extractHeaders(fetchedKey3Value);
+ final Headers retrievedKey3Headers = Utils.headers(fetchedKey3Value);
assertEquals(0, retrievedKey3Headers.toArray().length);
// Verify headers for key4 (headers with data)
final byte[] fetchedKey4Value = newStore.fetch(key4, baseTime + 400);
- final Headers retrievedKey4Headers = extractHeaders(fetchedKey4Value);
+ final Headers retrievedKey4Headers = Utils.headers(fetchedKey4Value);
assertEquals(2, retrievedKey4Headers.toArray().length);
assertEquals("header-value-1", new
String(retrievedKey4Headers.lastHeader("header-key-1").value()));
assertEquals("header-value-2", new
String(retrievedKey4Headers.lastHeader("header-key-2").value()));
@@ -256,41 +222,7 @@ public class TimeOrderedWindowStoreUpgradeTest {
newStore.init(context, newStore);
// Verify old data still accessible
- assertEquals("value1", new String(extractRawValue(newStore.fetch(key1,
baseTime + 100))));
-
- newStore.close();
- }
-
- @Test
- public void shouldMigrateFromWithoutIndexToWithIndexAndHeaders() {
- // Test: withIndex=false, withHeaders=false → withIndex=true,
withHeaders=true
- final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
- new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
- STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
- false, false, false // withIndex=false, withHeaders=false
- );
-
- final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
- oldStore.init(context, oldStore);
-
- final long baseTime = System.currentTimeMillis();
- final Bytes key1 = Bytes.wrap("key1".getBytes());
-
- oldStore.put(key1, "value1".getBytes(), baseTime + 100);
- oldStore.close();
-
- // Upgrade to both index and headers
- final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
- new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
- STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
- false, true, true // withIndex=true, withHeaders=true
- );
-
- final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
- newStore.init(context, newStore);
-
- // Verify old data still accessible
- assertEquals("value1", new String(extractRawValue(newStore.fetch(key1,
baseTime + 100))));
+ assertEquals("value1", new
String(Utils.rawAggregation(newStore.fetch(key1, baseTime + 100))));
newStore.close();
}