[ https://issues.apache.org/jira/browse/KAFKA-6628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466252#comment-16466252 ]
ASF GitHub Bot commented on KAFKA-6628: --------------------------------------- guozhangwang closed pull request #4836: KAFKA-6628: RocksDBSegmentedBytesStoreTest does not cover time window serdes URL: https://github.com/apache/kafka/pull/4836 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index bd2fa9110a5..db6d1d1bf22 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -22,16 +22,25 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.Parameter; + +import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; + import java.io.File; import java.text.SimpleDateFormat; @@ -51,32 +60,58 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -// TODO: this test does not cover time window serdes + +@RunWith(Parameterized.class) public class RocksDBSegmentedBytesStoreTest { - private final long retention = 60000L; + private final long retention = 1000; private final int numSegments = 3; private InternalMockProcessorContext context; private final String storeName = "bytes-store"; private RocksDBSegmentedBytesStore bytesStore; private File stateDir; - private final SessionKeySchema schema = new SessionKeySchema(); + private long windowSizeForTimeWindow = 500; + private final Window[] windows = new Window[4]; + + @Parameter + public SegmentedBytesStore.KeySchema schema; + + @Parameters(name = "{0}") + public static Object[] getKeySchemas() { + return new Object[]{new SessionKeySchema(), new WindowKeySchema()}; + } @Before public void before() { schema.init("topic"); + + if (schema instanceof SessionKeySchema) { + windows[0] = new SessionWindow(10, 10); + windows[1] = new SessionWindow(500, 1000); + windows[2] = new SessionWindow(1000, 1500); + windows[3] = new SessionWindow(30000, 60000); + } + if (schema instanceof WindowKeySchema) { + + windows[0] = timeWindowForSize(10, windowSizeForTimeWindow); + windows[1] = timeWindowForSize(500, windowSizeForTimeWindow); + windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow); + windows[3] = timeWindowForSize(60000, windowSizeForTimeWindow); + } + + bytesStore = new RocksDBSegmentedBytesStore(storeName, - retention, - numSegments, - schema); + retention, + numSegments, + schema); stateDir = TestUtils.tempDirectory(); context = new InternalMockProcessorContext( - stateDir, - Serdes.String(), - Serdes.Long(), - new NoOpRecordCollector(), - new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); + stateDir, + Serdes.String(), + Serdes.Long(), + new NoOpRecordCollector(), + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); bytesStore.init(context, bytesStore); } @@ -88,94 +123,83 @@ public void close() { @Test public void shouldPutAndFetch() { final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(10, 10L))), serializeValue(10L)); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(500L, 1000L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1500L, 2000L))), serializeValue(100L)); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(2500L, 3000L))), serializeValue(200L)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100)); + + final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500); - final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(10, 10)), 10L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(500, 1000)), 50L)); + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L), + KeyValue.pair(new Windowed<>(key, windows[1]), 50L)); - final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L); assertEquals(expected, toList(values)); } @Test public void shouldFindValuesWithinRange() { final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1000L, 1000L))), serializeValue(10L)); - final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L); - assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 10L)), toList(results)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100)); + final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999); + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L), + KeyValue.pair(new Windowed<>(key, windows[1]), 50L)); + + assertEquals(expected, toList(results)); } + @Test public void shouldRemove() { - bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000))), serializeValue(30L)); - bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1500, 2500))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(30)); + bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(50)); - bytesStore.remove(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000)))); - final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L); + bytesStore.remove(serializeKey(new Windowed<>("a", windows[0]))); + final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 100); assertFalse(value.hasNext()); } + @Test public void shouldRollSegments() { // just to validate directories final Segments segments = new Segments(storeName, retention, numSegments); final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); - assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); - assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1)), segmentDirs()); - - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L)); - assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1), - segments.segmentName(2)), segmentDirs()); + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(500)); + assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(121000L, 180000L))), serializeValue(300L)); - assertEquals(Utils.mkSet(segments.segmentName(1), - segments.segmentName(2), - segments.segmentName(3)), segmentDirs()); + bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(1000)); + assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(181000L, 240000L))), serializeValue(400L)); - assertEquals(Utils.mkSet(segments.segmentName(2), - segments.segmentName(3), - segments.segmentName(4)), segmentDirs()); + final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500)); - final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000)); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(121000L, 180000L)), 300L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(181000L, 240000L)), 400L) - ), results); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + KeyValue.pair(new Windowed<>(key, windows[1]), 100L), + KeyValue.pair(new Windowed<>(key, windows[2]), 500L)), results); } + @Test public void shouldGetAllSegments() { // just to validate directories final Segments segments = new Segments(storeName, retention, numSegments); final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); - assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); - assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1)), segmentDirs()); + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); + assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1), - segments.segmentName(2)), segmentDirs()); + segments.segmentName(1)), segmentDirs()); final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all()); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L) - ), results); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + KeyValue.pair(new Windowed<>(key, windows[3]), 100L) + ), results); } @@ -184,22 +208,18 @@ public void shouldFetchAllSegments() { // just to validate directories final Segments segments = new Segments(storeName, retention, numSegments); final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); - assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); - assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1)), segmentDirs()); + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); + assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1), - segments.segmentName(2)), segmentDirs()); + segments.segmentName(1)), segmentDirs()); final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60000L)); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L) - ), results); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + KeyValue.pair(new Windowed<>(key, windows[3]), 100L) + ), results); } @@ -207,8 +227,9 @@ public void shouldFetchAllSegments() { public void shouldLoadSegementsWithOldStyleDateFormattedName() { final Segments segments = new Segments(storeName, retention, numSegments); final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); + + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); bytesStore.close(); final String firstSegmentName = segments.segmentName(0); @@ -222,22 +243,24 @@ public void shouldLoadSegementsWithOldStyleDateFormattedName() { assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName)); bytesStore = new RocksDBSegmentedBytesStore(storeName, - retention, - numSegments, - schema); + retention, + numSegments, + schema); bytesStore.init(context, bytesStore); final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L)); - assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)))); + assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + KeyValue.pair(new Windowed<>(key, windows[3]), 100L)))); } + @Test public void shouldLoadSegementsWithOldStyleColonFormattedName() { final Segments segments = new Segments(storeName, retention, numSegments); final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); + + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); bytesStore.close(); final String firstSegmentName = segments.segmentName(0); @@ -247,24 +270,25 @@ public void shouldLoadSegementsWithOldStyleColonFormattedName() { assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName)); bytesStore = new RocksDBSegmentedBytesStore(storeName, - retention, - numSegments, - schema); + retention, + numSegments, + schema); bytesStore.init(context, bytesStore); final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L)); - assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)))); + assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + KeyValue.pair(new Windowed<>(key, windows[3]), 100L)))); } + @Test public void shouldBeAbleToWriteToReInitializedStore() { final String key = "a"; // need to create a segment so we can attempt to write to it again. - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50)); bytesStore.close(); bytesStore.init(context, bytesStore); - bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100)); } private Set<String> segmentDirs() { @@ -278,20 +302,33 @@ public void shouldBeAbleToWriteToReInitializedStore() { } private Bytes serializeKey(final Windowed<String> key) { - return Bytes.wrap(SessionKeySchema.toBinary(key, Serdes.String().serializer(), "dummy")); + final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class); + if (schema instanceof SessionKeySchema) { + return Bytes.wrap(SessionKeySchema.toBinary(key, stateSerdes.keySerializer(), "dummy")); + } else { + return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes); + } } private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) { final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>(); + final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class); while (iterator.hasNext()) { final KeyValue<Bytes, byte[]> next = iterator.next(); - final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair( - SessionKeySchema.from(next.key.get(), Serdes.String().deserializer(), "dummy"), - Serdes.Long().deserializer().deserialize("dummy", next.value) - ); - results.add(deserialized); + if (schema instanceof WindowKeySchema) { + final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair( + WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow, stateSerdes), + stateSerdes.valueDeserializer().deserialize("dummy", next.value) + ); + results.add(deserialized); + } else { + final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair( + SessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"), + stateSerdes.valueDeserializer().deserialize("dummy", next.value) + ); + results.add(deserialized); + } } return results; } - } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RocksDBSegmentedBytesStoreTest does not cover time window serdes > ---------------------------------------------------------------- > > Key: KAFKA-6628 > URL: https://issues.apache.org/jira/browse/KAFKA-6628 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Liju > Priority: Major > Labels: newbie, unit-test > > The RocksDBSegmentedBytesStoreTest.java only covers session window serdes, > but not time window serdes. We should fill in this coverage gap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)