This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ef14f76fb30 KAFKA-6629: parameterise SegmentedCacheFunctionTest for session key schemas (#19404) ef14f76fb30 is described below commit ef14f76fb30b914b210f0dcd405a3e0f64bb9374 Author: lorcan <lorcan.ja...@yahoo.com> AuthorDate: Thu May 29 16:57:10 2025 +0100 KAFKA-6629: parameterise SegmentedCacheFunctionTest for session key schemas (#19404) Addresses: [KAFKA-6629](https://issues.apache.org/jira/browse/KAFKA-6629) Adds configuration for the SessionKeySchema and parameterises the existing tests so that both WindowKeys and SessionKeys are tested under the existing unit tests. Reviewers: Bill Bejeck <bbej...@apache.org> --------- Co-authored-by: Lorcan <lorcanjam...@gmail.com> --- .../internals/SegmentedCacheFunctionTest.java | 131 +++++++++++++++------ 1 file changed, 92 insertions(+), 39 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java index 7a72c0f98af..09bf80b72d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java @@ -19,97 +19,151 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.nio.ByteBuffer; +import java.util.stream.Stream; +import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; -// TODO: this test coverage does not consider session serde yet -public class SegmentedCacheFunctionTest { +class SegmentedCacheFunctionTest { private static final int SEGMENT_INTERVAL = 17; - private static final int TIMESTAMP = 736213517; + private static final int START_TIMESTAMP = 736213517; + private static final int END_TIMESTAMP = 800000000; - private static final Bytes THE_KEY = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42); - private static final Bytes THE_CACHE_KEY = Bytes.wrap( - ByteBuffer.allocate(8 + THE_KEY.get().length) - .putLong(TIMESTAMP / SEGMENT_INTERVAL) - .put(THE_KEY.get()).array() + private static final Bytes THE_WINDOW_KEY = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, START_TIMESTAMP, 42); + private static final Bytes THE_SESSION_KEY = toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, END_TIMESTAMP, START_TIMESTAMP); + + private static final Bytes THE_WINDOW_CACHE_KEY = Bytes.wrap( + ByteBuffer.allocate(8 + THE_WINDOW_KEY.get().length) + .putLong(START_TIMESTAMP / SEGMENT_INTERVAL) + .put(THE_WINDOW_KEY.get()).array() ); - private final SegmentedCacheFunction cacheFunction = new SegmentedCacheFunction(new WindowKeySchema(), SEGMENT_INTERVAL); + private static final Bytes THE_SESSION_CACHE_KEY = Bytes.wrap( + ByteBuffer.allocate(8 + THE_SESSION_KEY.get().length) + .putLong(END_TIMESTAMP / SEGMENT_INTERVAL) + .put(THE_SESSION_KEY.get()).array() + ); + + private SegmentedCacheFunction createCacheFunction(final SegmentedBytesStore.KeySchema keySchema) { + return new SegmentedCacheFunction(keySchema, SEGMENT_INTERVAL); + } - @Test - public void key() { - assertThat( - cacheFunction.key(THE_CACHE_KEY), - equalTo(THE_KEY) + private static Stream<Arguments> provideKeysAndSchemas() { + return Stream.of( + Arguments.of(THE_WINDOW_CACHE_KEY, THE_WINDOW_KEY, new WindowKeySchema()), + Arguments.of(THE_SESSION_CACHE_KEY, THE_SESSION_KEY, new SessionKeySchema()) + ); + } + + private static Stream<Arguments> provideKeysTimestampsAndSchemas() { + return Stream.of( + Arguments.of(THE_WINDOW_KEY, START_TIMESTAMP, new WindowKeySchema()), + Arguments.of(THE_SESSION_KEY, END_TIMESTAMP, new SessionKeySchema()) + ); + } + + private static Stream<Arguments> provideKeysForBoundaryChecks() { + final Bytes sameKeyInPriorSegmentWindow = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42); + final Bytes sameKeyInPriorSegmentSession = toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 12345); + + final Bytes lowerKeyInSameSegmentWindow = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, START_TIMESTAMP - 1, 0); + final Bytes lowerKeyInSameSegmentSession = toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, END_TIMESTAMP - 1, START_TIMESTAMP + 1); + + return Stream.of( + Arguments.of(THE_WINDOW_KEY, new WindowKeySchema(), sameKeyInPriorSegmentWindow, lowerKeyInSameSegmentWindow), + Arguments.of(THE_SESSION_KEY, new SessionKeySchema(), sameKeyInPriorSegmentSession, lowerKeyInSameSegmentSession) ); } - @Test - public void cacheKey() { - final long segmentId = TIMESTAMP / SEGMENT_INTERVAL; + static Bytes toStoreKeyBinary(final byte[] serializedKey, + final long endTime, + final long startTime) { + final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + TIMESTAMP_SIZE); + buf.put(serializedKey); + buf.putLong(endTime); + buf.putLong(startTime); - final Bytes actualCacheKey = cacheFunction.cacheKey(THE_KEY); + return Bytes.wrap(buf.array()); + } + + @ParameterizedTest + @MethodSource("provideKeysAndSchemas") + void testKey(final Bytes cacheKey, final Bytes key, final SegmentedBytesStore.KeySchema keySchema) { + assertThat( + createCacheFunction(keySchema).key(cacheKey), + equalTo(key) + ); + } + + @ParameterizedTest + @MethodSource("provideKeysTimestampsAndSchemas") + void cacheKey(final Bytes key, final int timeStamp, final SegmentedBytesStore.KeySchema keySchema) { + final long segmentId = timeStamp / SEGMENT_INTERVAL; + final Bytes actualCacheKey = createCacheFunction(keySchema).cacheKey(key); final ByteBuffer buffer = ByteBuffer.wrap(actualCacheKey.get()); assertThat(buffer.getLong(), equalTo(segmentId)); final byte[] actualKey = new byte[buffer.remaining()]; buffer.get(actualKey); - assertThat(Bytes.wrap(actualKey), equalTo(THE_KEY)); + assertThat(Bytes.wrap(actualKey), equalTo(key)); } - @Test - public void testRoundTripping() { + @ParameterizedTest + @MethodSource("provideKeysAndSchemas") + void testRoundTripping(final Bytes cacheKey, final Bytes key, final SegmentedBytesStore.KeySchema keySchema) { + final SegmentedCacheFunction cacheFunction = createCacheFunction(keySchema); + assertThat( - cacheFunction.key(cacheFunction.cacheKey(THE_KEY)), - equalTo(THE_KEY) + cacheFunction.key(cacheFunction.cacheKey(key)), + equalTo(key) ); assertThat( - cacheFunction.cacheKey(cacheFunction.key(THE_CACHE_KEY)), - equalTo(THE_CACHE_KEY) + cacheFunction.cacheKey(cacheFunction.key(cacheKey)), + equalTo(cacheKey) ); } - @Test - public void compareSegmentedKeys() { + @ParameterizedTest + @MethodSource("provideKeysForBoundaryChecks") + void compareSegmentedKeys(final Bytes key, final SegmentedBytesStore.KeySchema keySchema, final Bytes sameKeyInPriorSegment, final Bytes lowerKeyInSameSegment) { + final SegmentedCacheFunction cacheFunction = createCacheFunction(keySchema); assertThat( "same key in same segment should be ranked the same", cacheFunction.compareSegmentedKeys( - cacheFunction.cacheKey(THE_KEY), - THE_KEY + cacheFunction.cacheKey(key), + key ) == 0 ); - final Bytes sameKeyInPriorSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42); - assertThat( "same keys in different segments should be ordered according to segment", cacheFunction.compareSegmentedKeys( cacheFunction.cacheKey(sameKeyInPriorSegment), - THE_KEY + key ) < 0 ); assertThat( "same keys in different segments should be ordered according to segment", cacheFunction.compareSegmentedKeys( - cacheFunction.cacheKey(THE_KEY), + cacheFunction.cacheKey(key), sameKeyInPriorSegment ) > 0 ); - final Bytes lowerKeyInSameSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0); - assertThat( "different keys in same segments should be ordered according to key", cacheFunction.compareSegmentedKeys( - cacheFunction.cacheKey(THE_KEY), + cacheFunction.cacheKey(key), lowerKeyInSameSegment ) > 0 ); @@ -118,9 +172,8 @@ public class SegmentedCacheFunctionTest { "different keys in same segments should be ordered according to key", cacheFunction.compareSegmentedKeys( cacheFunction.cacheKey(lowerKeyInSameSegment), - THE_KEY + key ) < 0 ); } - }