This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef14f76fb30 KAFKA-6629: parameterise SegmentedCacheFunctionTest for 
session key schemas (#19404)
ef14f76fb30 is described below

commit ef14f76fb30b914b210f0dcd405a3e0f64bb9374
Author: lorcan <lorcan.ja...@yahoo.com>
AuthorDate: Thu May 29 16:57:10 2025 +0100

    KAFKA-6629: parameterise SegmentedCacheFunctionTest for session key schemas 
(#19404)
    
    Addresses:
    [KAFKA-6629](https://issues.apache.org/jira/browse/KAFKA-6629)
    
    Adds configuration for the SessionKeySchema and parameterises the
    existing tests  so that both WindowKeys and SessionKeys are tested under
    the existing unit tests.
    
    Reviewers: Bill Bejeck <bbej...@apache.org>
    
    ---------
    
    Co-authored-by: Lorcan <lorcanjam...@gmail.com>
---
 .../internals/SegmentedCacheFunctionTest.java      | 131 +++++++++++++++------
 1 file changed, 92 insertions(+), 39 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
index 7a72c0f98af..09bf80b72d0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
@@ -19,97 +19,151 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.nio.ByteBuffer;
+import java.util.stream.Stream;
 
+import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 
-// TODO: this test coverage does not consider session serde yet
-public class SegmentedCacheFunctionTest {
+class SegmentedCacheFunctionTest {
 
     private static final int SEGMENT_INTERVAL = 17;
-    private static final int TIMESTAMP = 736213517;
+    private static final int START_TIMESTAMP = 736213517;
+    private static final int END_TIMESTAMP = 800000000;
 
-    private static final Bytes THE_KEY = WindowKeySchema.toStoreKeyBinary(new 
byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42);
-    private static final Bytes THE_CACHE_KEY = Bytes.wrap(
-        ByteBuffer.allocate(8 + THE_KEY.get().length)
-            .putLong(TIMESTAMP / SEGMENT_INTERVAL)
-            .put(THE_KEY.get()).array()
+    private static final Bytes THE_WINDOW_KEY = 
WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, START_TIMESTAMP, 
42);
+    private static final Bytes THE_SESSION_KEY = toStoreKeyBinary(new 
byte[]{0xA, 0xB, 0xC}, END_TIMESTAMP, START_TIMESTAMP);
+
+    private static final Bytes THE_WINDOW_CACHE_KEY = Bytes.wrap(
+        ByteBuffer.allocate(8 + THE_WINDOW_KEY.get().length)
+            .putLong(START_TIMESTAMP / SEGMENT_INTERVAL)
+            .put(THE_WINDOW_KEY.get()).array()
     );
 
-    private final SegmentedCacheFunction cacheFunction = new 
SegmentedCacheFunction(new WindowKeySchema(), SEGMENT_INTERVAL);
+    private static final Bytes THE_SESSION_CACHE_KEY = Bytes.wrap(
+        ByteBuffer.allocate(8 + THE_SESSION_KEY.get().length)
+                .putLong(END_TIMESTAMP / SEGMENT_INTERVAL)
+                .put(THE_SESSION_KEY.get()).array()
+    );
+    
+    private SegmentedCacheFunction createCacheFunction(final 
SegmentedBytesStore.KeySchema keySchema) {
+        return new SegmentedCacheFunction(keySchema, SEGMENT_INTERVAL);
+    }
 
-    @Test
-    public void key() {
-        assertThat(
-            cacheFunction.key(THE_CACHE_KEY),
-            equalTo(THE_KEY)
+    private static Stream<Arguments> provideKeysAndSchemas() {
+        return Stream.of(
+                Arguments.of(THE_WINDOW_CACHE_KEY, THE_WINDOW_KEY, new 
WindowKeySchema()),
+                Arguments.of(THE_SESSION_CACHE_KEY, THE_SESSION_KEY, new 
SessionKeySchema())
+        );
+    }
+
+    private static Stream<Arguments> provideKeysTimestampsAndSchemas() {
+        return Stream.of(
+                Arguments.of(THE_WINDOW_KEY, START_TIMESTAMP, new 
WindowKeySchema()),
+                Arguments.of(THE_SESSION_KEY, END_TIMESTAMP, new 
SessionKeySchema())
+        );
+    }
+
+    private static Stream<Arguments> provideKeysForBoundaryChecks() {
+        final Bytes sameKeyInPriorSegmentWindow = 
WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
+        final Bytes sameKeyInPriorSegmentSession = toStoreKeyBinary(new 
byte[]{0xA, 0xB, 0xC}, 1234, 12345);
+
+        final Bytes lowerKeyInSameSegmentWindow = 
WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, START_TIMESTAMP - 
1, 0);
+        final Bytes lowerKeyInSameSegmentSession = toStoreKeyBinary(new 
byte[]{0xA, 0xB, 0xB}, END_TIMESTAMP - 1, START_TIMESTAMP + 1);
+        
+        return Stream.of(
+                Arguments.of(THE_WINDOW_KEY, new WindowKeySchema(), 
sameKeyInPriorSegmentWindow, lowerKeyInSameSegmentWindow),
+                Arguments.of(THE_SESSION_KEY, new SessionKeySchema(), 
sameKeyInPriorSegmentSession, lowerKeyInSameSegmentSession)
         );
     }
 
-    @Test
-    public void cacheKey() {
-        final long segmentId = TIMESTAMP / SEGMENT_INTERVAL;
+    static Bytes toStoreKeyBinary(final byte[] serializedKey,
+                                  final long endTime,
+                                  final long startTime) {
+        final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + 
TIMESTAMP_SIZE + TIMESTAMP_SIZE);
+        buf.put(serializedKey);
+        buf.putLong(endTime);
+        buf.putLong(startTime);
 
-        final Bytes actualCacheKey = cacheFunction.cacheKey(THE_KEY);
+        return Bytes.wrap(buf.array());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideKeysAndSchemas")
+    void testKey(final Bytes cacheKey, final Bytes key, final 
SegmentedBytesStore.KeySchema keySchema) {
+        assertThat(
+                createCacheFunction(keySchema).key(cacheKey),
+                equalTo(key)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideKeysTimestampsAndSchemas")
+    void cacheKey(final Bytes key, final int timeStamp, final 
SegmentedBytesStore.KeySchema keySchema) {
+        final long segmentId = timeStamp / SEGMENT_INTERVAL;
+        final Bytes actualCacheKey = 
createCacheFunction(keySchema).cacheKey(key);
         final ByteBuffer buffer = ByteBuffer.wrap(actualCacheKey.get());
 
         assertThat(buffer.getLong(), equalTo(segmentId));
 
         final byte[] actualKey = new byte[buffer.remaining()];
         buffer.get(actualKey);
-        assertThat(Bytes.wrap(actualKey), equalTo(THE_KEY));
+        assertThat(Bytes.wrap(actualKey), equalTo(key));
     }
 
-    @Test
-    public void testRoundTripping() {
+    @ParameterizedTest
+    @MethodSource("provideKeysAndSchemas")
+    void testRoundTripping(final Bytes cacheKey, final Bytes key, final 
SegmentedBytesStore.KeySchema keySchema) {
+        final SegmentedCacheFunction cacheFunction = 
createCacheFunction(keySchema);
+
         assertThat(
-            cacheFunction.key(cacheFunction.cacheKey(THE_KEY)),
-            equalTo(THE_KEY)
+            cacheFunction.key(cacheFunction.cacheKey(key)),
+            equalTo(key)
         );
 
         assertThat(
-            cacheFunction.cacheKey(cacheFunction.key(THE_CACHE_KEY)),
-            equalTo(THE_CACHE_KEY)
+            cacheFunction.cacheKey(cacheFunction.key(cacheKey)),
+            equalTo(cacheKey)
         );
     }
 
-    @Test
-    public void compareSegmentedKeys() {
+    @ParameterizedTest
+    @MethodSource("provideKeysForBoundaryChecks")
+    void compareSegmentedKeys(final Bytes key, final 
SegmentedBytesStore.KeySchema keySchema, final Bytes sameKeyInPriorSegment, 
final Bytes lowerKeyInSameSegment) {
+        final SegmentedCacheFunction cacheFunction = 
createCacheFunction(keySchema);
         assertThat(
             "same key in same segment should be ranked the same",
             cacheFunction.compareSegmentedKeys(
-                cacheFunction.cacheKey(THE_KEY),
-                THE_KEY
+                cacheFunction.cacheKey(key),
+                key
             ) == 0
         );
 
-        final Bytes sameKeyInPriorSegment = 
WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
-
         assertThat(
             "same keys in different segments should be ordered according to 
segment",
             cacheFunction.compareSegmentedKeys(
                 cacheFunction.cacheKey(sameKeyInPriorSegment),
-                THE_KEY
+                key
             ) < 0
         );
 
         assertThat(
             "same keys in different segments should be ordered according to 
segment",
             cacheFunction.compareSegmentedKeys(
-                cacheFunction.cacheKey(THE_KEY),
+                cacheFunction.cacheKey(key),
                 sameKeyInPriorSegment
             ) > 0
         );
 
-        final Bytes lowerKeyInSameSegment = 
WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0);
-
         assertThat(
             "different keys in same segments should be ordered according to 
key",
             cacheFunction.compareSegmentedKeys(
-                cacheFunction.cacheKey(THE_KEY),
+                cacheFunction.cacheKey(key),
                 lowerKeyInSameSegment
             ) > 0
         );
@@ -118,9 +172,8 @@ public class SegmentedCacheFunctionTest {
             "different keys in same segments should be ordered according to 
key",
             cacheFunction.compareSegmentedKeys(
                 cacheFunction.cacheKey(lowerKeyInSameSegment),
-                THE_KEY
+                key
             ) < 0
         );
     }
-
 }

Reply via email to