[ 
https://issues.apache.org/jira/browse/KAFKA-6628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466252#comment-16466252
 ] 

ASF GitHub Bot commented on KAFKA-6628:
---------------------------------------

guozhangwang closed pull request #4836: KAFKA-6628: 
RocksDBSegmentedBytesStoreTest does not cover time window serdes
URL: https://github.com/apache/kafka/pull/4836
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index bd2fa9110a5..db6d1d1bf22 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -22,16 +22,25 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.Parameter;
+
+import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+
 
 import java.io.File;
 import java.text.SimpleDateFormat;
@@ -51,32 +60,58 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-// TODO: this test does not cover time window serdes
+
+@RunWith(Parameterized.class)
 public class RocksDBSegmentedBytesStoreTest {
 
-    private final long retention = 60000L;
+    private final long retention = 1000;
     private final int numSegments = 3;
     private InternalMockProcessorContext context;
     private final String storeName = "bytes-store";
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
-    private final SessionKeySchema schema = new SessionKeySchema();
+    private long windowSizeForTimeWindow = 500;
+    private final Window[] windows = new Window[4];
+
+    @Parameter
+    public SegmentedBytesStore.KeySchema schema;
+
+    @Parameters(name = "{0}")
+    public static Object[] getKeySchemas() {
+        return new Object[]{new SessionKeySchema(), new WindowKeySchema()};
+    }
 
     @Before
     public void before() {
         schema.init("topic");
+
+        if (schema instanceof SessionKeySchema) {
+            windows[0] = new SessionWindow(10, 10);
+            windows[1] = new SessionWindow(500, 1000);
+            windows[2] = new SessionWindow(1000, 1500);
+            windows[3] = new SessionWindow(30000, 60000);
+        }
+        if (schema instanceof WindowKeySchema) {
+
+            windows[0] = timeWindowForSize(10, windowSizeForTimeWindow);
+            windows[1] = timeWindowForSize(500, windowSizeForTimeWindow);
+            windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow);
+            windows[3] = timeWindowForSize(60000, windowSizeForTimeWindow);
+        }
+
+
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
-                                                    retention,
-                                                    numSegments,
-                                                    schema);
+                retention,
+                numSegments,
+                schema);
 
         stateDir = TestUtils.tempDirectory();
         context = new InternalMockProcessorContext(
-            stateDir,
-            Serdes.String(),
-            Serdes.Long(),
-            new NoOpRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())));
+                stateDir,
+                Serdes.String(),
+                Serdes.Long(),
+                new NoOpRecordCollector(),
+                new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())));
         bytesStore.init(context, bytesStore);
     }
 
@@ -88,94 +123,83 @@ public void close() {
     @Test
     public void shouldPutAndFetch() {
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(10, 
10L))), serializeValue(10L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(500L, 1000L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(1500L, 2000L))), serializeValue(100L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(2500L, 3000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(10));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), 
serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), 
serializeValue(100));
+
+        final KeyValueIterator<Bytes, byte[]> values = 
bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500);
 
-        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(10, 10)), 
10L),
-                                                                               
     KeyValue.pair(new Windowed<>(key, new SessionWindow(500, 1000)), 50L));
+        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
 
-        final KeyValueIterator<Bytes, byte[]> values = 
bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L);
         assertEquals(expected, toList(values));
     }
 
     @Test
     public void shouldFindValuesWithinRange() {
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(1000L, 1000L))), serializeValue(10L));
-        final KeyValueIterator<Bytes, byte[]> results = 
bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L);
-        assertEquals(Collections.singletonList(KeyValue.pair(new 
Windowed<>(key, new SessionWindow(1000L, 1000L)), 10L)), toList(results));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(10));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), 
serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), 
serializeValue(100));
+        final KeyValueIterator<Bytes, byte[]> results = 
bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999);
+        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+
+        assertEquals(expected, toList(results));
     }
 
+
     @Test
     public void shouldRemove() {
-        bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0, 
1000))), serializeValue(30L));
-        bytesStore.put(serializeKey(new Windowed<>("a", new 
SessionWindow(1500, 2500))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), 
serializeValue(30));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), 
serializeValue(50));
 
-        bytesStore.remove(serializeKey(new Windowed<>("a", new 
SessionWindow(0, 1000))));
-        final KeyValueIterator<Bytes, byte[]> value = 
bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L);
+        bytesStore.remove(serializeKey(new Windowed<>("a", windows[0])));
+        final KeyValueIterator<Bytes, byte[]> value = 
bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 100);
         assertFalse(value.hasNext());
     }
 
+
     @Test
     public void shouldRollSegments() {
         // just to validate directories
         final Segments segments = new Segments(storeName, retention, 
numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
-        assertEquals(Collections.singleton(segments.segmentName(0)), 
segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(30000L, 60000L))), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1)), segmentDirs());
-
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(61000L, 120000L))), serializeValue(200L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1),
-                                 segments.segmentName(2)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), 
serializeValue(100));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), 
serializeValue(500));
+        assertEquals(Collections.singleton(segments.segmentName(0)), 
segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(121000L, 180000L))), serializeValue(300L));
-        assertEquals(Utils.mkSet(segments.segmentName(1),
-                                 segments.segmentName(2),
-                                 segments.segmentName(3)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), 
serializeValue(1000));
+        assertEquals(Utils.mkSet(segments.segmentName(0), 
segments.segmentName(1)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(181000L, 240000L))), serializeValue(400L));
-        assertEquals(Utils.mkSet(segments.segmentName(2),
-                                 segments.segmentName(3),
-                                 segments.segmentName(4)), segmentDirs());
+        final List<KeyValue<Windowed<String>, Long>> results = 
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
 
-        final List<KeyValue<Windowed<String>, Long>> results = 
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000));
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new 
SessionWindow(61000L, 120000L)), 200L),
-                                   KeyValue.pair(new Windowed<>(key, new 
SessionWindow(121000L, 180000L)), 300L),
-                                   KeyValue.pair(new Windowed<>(key, new 
SessionWindow(181000L, 240000L)), 400L)
-                                                 ), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, 
windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
+                KeyValue.pair(new Windowed<>(key, windows[2]), 500L)), 
results);
 
     }
 
+
     @Test
     public void shouldGetAllSegments() {
         // just to validate directories
         final Segments segments = new Segments(storeName, retention, 
numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
-        assertEquals(Collections.singleton(segments.segmentName(0)), 
segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(30000L, 60000L))), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
+        assertEquals(Collections.singleton(segments.segmentName(0)), 
segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(61000L, 120000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), 
serializeValue(100L));
         assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1),
-                                 segments.segmentName(2)), segmentDirs());
+                segments.segmentName(1)), segmentDirs());
 
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(bytesStore.all());
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new 
SessionWindow(0L, 0L)), 50L),
-                                   KeyValue.pair(new Windowed<>(key, new 
SessionWindow(30000L, 60000L)), 100L),
-                                   KeyValue.pair(new Windowed<>(key, new 
SessionWindow(61000L, 120000L)), 200L)
-                                                 ), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, 
windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+        ), results);
 
     }
 
@@ -184,22 +208,18 @@ public void shouldFetchAllSegments() {
         // just to validate directories
         final Segments segments = new Segments(storeName, retention, 
numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
-        assertEquals(Collections.singleton(segments.segmentName(0)), 
segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(30000L, 60000L))), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
+        assertEquals(Collections.singleton(segments.segmentName(0)), 
segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(61000L, 120000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), 
serializeValue(100L));
         assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1),
-                                 segments.segmentName(2)), segmentDirs());
+                segments.segmentName(1)), segmentDirs());
 
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(bytesStore.fetchAll(0L, 60000L));
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new 
SessionWindow(0L, 0L)), 50L),
-                                   KeyValue.pair(new Windowed<>(key, new 
SessionWindow(30000L, 60000L)), 100L)
-                                                 ), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, 
windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+        ), results);
 
     }
 
@@ -207,8 +227,9 @@ public void shouldFetchAllSegments() {
     public void shouldLoadSegementsWithOldStyleDateFormattedName() {
         final Segments segments = new Segments(storeName, retention, 
numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(30000L, 60000L))), serializeValue(100L));
+
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), 
serializeValue(100L));
         bytesStore.close();
 
         final String firstSegmentName = segments.segmentName(0);
@@ -222,22 +243,24 @@ public void 
shouldLoadSegementsWithOldStyleDateFormattedName() {
         assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
-                                                    retention,
-                                                    numSegments,
-                                                    schema);
+                retention,
+                numSegments,
+                schema);
 
         bytesStore.init(context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
-        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new 
Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
-                                                  KeyValue.pair(new 
Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
+        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new 
Windowed<>(key, windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
     }
 
+
     @Test
     public void shouldLoadSegementsWithOldStyleColonFormattedName() {
         final Segments segments = new Segments(storeName, retention, 
numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(30000L, 60000L))), serializeValue(100L));
+
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), 
serializeValue(100L));
         bytesStore.close();
 
         final String firstSegmentName = segments.segmentName(0);
@@ -247,24 +270,25 @@ public void 
shouldLoadSegementsWithOldStyleColonFormattedName() {
         assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
-            retention,
-            numSegments,
-            schema);
+                retention,
+                numSegments,
+                schema);
 
         bytesStore.init(context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
-        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new 
Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
-            KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 
60000L)), 100L))));
+        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new 
Windowed<>(key, windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
     }
 
+
     @Test
     public void shouldBeAbleToWriteToReInitializedStore() {
         final String key = "a";
         // need to create a segment so we can attempt to write to it again.
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50));
         bytesStore.close();
         bytesStore.init(context, bytesStore);
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), 
serializeValue(100));
     }
 
     private Set<String> segmentDirs() {
@@ -278,20 +302,33 @@ public void shouldBeAbleToWriteToReInitializedStore() {
     }
 
     private Bytes serializeKey(final Windowed<String> key) {
-        return Bytes.wrap(SessionKeySchema.toBinary(key, 
Serdes.String().serializer(), "dummy"));
+        final StateSerdes<String, Long> stateSerdes = 
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+        if (schema instanceof SessionKeySchema) {
+            return Bytes.wrap(SessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), "dummy"));
+        } else {
+            return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes);
+        }
     }
 
     private List<KeyValue<Windowed<String>, Long>> toList(final 
KeyValueIterator<Bytes, byte[]> iterator) {
         final List<KeyValue<Windowed<String>, Long>> results = new 
ArrayList<>();
+        final StateSerdes<String, Long> stateSerdes = 
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
         while (iterator.hasNext()) {
             final KeyValue<Bytes, byte[]> next = iterator.next();
-            final KeyValue<Windowed<String>, Long> deserialized = 
KeyValue.pair(
-                    SessionKeySchema.from(next.key.get(), 
Serdes.String().deserializer(), "dummy"),
-                    Serdes.Long().deserializer().deserialize("dummy", 
next.value)
-            );
-            results.add(deserialized);
+            if (schema instanceof WindowKeySchema) {
+                final KeyValue<Windowed<String>, Long> deserialized = 
KeyValue.pair(
+                        WindowKeySchema.fromStoreKey(next.key.get(), 
windowSizeForTimeWindow, stateSerdes),
+                        stateSerdes.valueDeserializer().deserialize("dummy", 
next.value)
+                );
+                results.add(deserialized);
+            } else {
+                final KeyValue<Windowed<String>, Long> deserialized = 
KeyValue.pair(
+                        SessionKeySchema.from(next.key.get(), 
stateSerdes.keyDeserializer(), "dummy"),
+                        stateSerdes.valueDeserializer().deserialize("dummy", 
next.value)
+                );
+                results.add(deserialized);
+            }
         }
         return results;
     }
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RocksDBSegmentedBytesStoreTest does not cover time window serdes
> ----------------------------------------------------------------
>
>                 Key: KAFKA-6628
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6628
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liju
>            Priority: Major
>              Labels: newbie, unit-test
>
> The RocksDBSegmentedBytesStoreTest.java only covers session window serdes, 
> but not time window serdes. We should fill in this coverage gap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to