Repository: kafka Updated Branches: refs/heads/0.11.0 b661d3b8a -> 9170e87b2
http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 5e0e55c..040def6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -34,6 +35,7 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Test; @@ -62,7 +64,7 @@ public class RocksDBWindowStoreTest { private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L; private final int numSegments = 3; - private final long windowSize = 3; + private final static long WINDOW_SIZE = 3; private final String windowName = "window"; private final long segmentSize = Segments.MIN_SEGMENT_INTERVAL; private final long retentionPeriod = segmentSize * (numSegments - 1); @@ -95,7 +97,8 @@ public class RocksDBWindowStoreTest { @SuppressWarnings("unchecked") private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) { - final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, retainDuplicates, Serdes.Integer(), Serdes.String(), windowSize, true, Collections.<String, String>emptyMap(), enableCaching); + final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, retainDuplicates, Serdes.Integer(), Serdes.String(), + WINDOW_SIZE, true, Collections.<String, String>emptyMap(), enableCaching); final WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); store.init(context, store); return store; @@ -148,30 +151,30 @@ public class RocksDBWindowStoreTest { putFirstBatch(windowStore, startTime, context); - assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize))); - assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE))); putSecondBatch(windowStore, startTime, context); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize))); - assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - WINDOW_SIZE, startTime - 2L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L + WINDOW_SIZE))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -189,36 +192,85 @@ public class RocksDBWindowStoreTest { @SuppressWarnings("unchecked") @Test + public void testFetchRange() throws IOException { + windowStore = createWindowStore(context, false, true); + long startTime = segmentSize - 4L; + + putFirstBatch(windowStore, startTime, context); + + final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0); + final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1); + final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2); + final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4); + final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5); + + assertEquals( + Utils.mkList(zero, one), + StreamsTestUtils.toList(windowStore.fetch(0, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE)) + ); + assertEquals( + Utils.mkList(one), + StreamsTestUtils.toList(windowStore.fetch(1, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE)) + ); + assertEquals( + Utils.mkList(one, two), + StreamsTestUtils.toList(windowStore.fetch(1, 3, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE)) + ); + assertEquals( + Utils.mkList(zero, one, two), + StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE)) + ); + assertEquals( + Utils.mkList(zero, one, two, + four, five), + StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE + 5L)) + ); + assertEquals( + Utils.mkList(two, four, five), + StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 2L, startTime + 0L + WINDOW_SIZE + 5L)) + ); + assertEquals( + Utils.mkList(), + StreamsTestUtils.toList(windowStore.fetch(4, 5, startTime + 2L, startTime + WINDOW_SIZE)) + ); + assertEquals( + Utils.mkList(), + StreamsTestUtils.toList(windowStore.fetch(0, 3, startTime + 3L, startTime + WINDOW_SIZE + 5)) + ); + } + + @SuppressWarnings("unchecked") + @Test public void testPutAndFetchBefore() throws IOException { windowStore = createWindowStore(context, false, true); long startTime = segmentSize - 4L; putFirstBatch(windowStore, startTime, context); - assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L))); - assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L))); + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L))); putSecondBatch(windowStore, startTime, context); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - windowSize, startTime + 0L))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L))); - assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L))); - assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - windowSize, startTime + 13L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - WINDOW_SIZE, startTime + 0L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L))); + assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L))); + assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - WINDOW_SIZE, startTime + 13L))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -242,30 +294,30 @@ public class RocksDBWindowStoreTest { putFirstBatch(windowStore, startTime, context); - assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + windowSize))); - assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + WINDOW_SIZE))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + WINDOW_SIZE))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + WINDOW_SIZE))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + WINDOW_SIZE))); putSecondBatch(windowStore, startTime, context); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + windowSize))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + windowSize))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + windowSize))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + windowSize))); - assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + WINDOW_SIZE))); + assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + WINDOW_SIZE))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -290,17 +342,17 @@ public class RocksDBWindowStoreTest { context.setRecordContext(createRecordContext(startTime)); windowStore.put(0, "zero"); - assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); windowStore.put(0, "zero"); windowStore.put(0, "zero+"); windowStore.put(0, "zero++"); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -351,12 +403,12 @@ public class RocksDBWindowStoreTest { segments.segmentName(3), segments.segmentName(4)), segmentDirs(baseDir)); - assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE))); context.setRecordContext(createRecordContext(startTime + incr * 6)); windowStore.put(6, "six"); @@ -365,13 +417,13 @@ public class RocksDBWindowStoreTest { segments.segmentName(5)), segmentDirs(baseDir)); - assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE))); context.setRecordContext(createRecordContext(startTime + incr * 7)); @@ -380,14 +432,14 @@ public class RocksDBWindowStoreTest { segments.segmentName(4), segments.segmentName(5)), segmentDirs(baseDir)); - assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE))); + assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE))); context.setRecordContext(createRecordContext(startTime + incr * 8)); windowStore.put(8, "eight"); @@ -396,15 +448,15 @@ public class RocksDBWindowStoreTest { segments.segmentName(6)), segmentDirs(baseDir)); - assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE))); + assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE))); + assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE))); // check segment directories windowStore.flush(); @@ -449,27 +501,27 @@ public class RocksDBWindowStoreTest { Utils.delete(baseDir); windowStore = createWindowStore(context, false, true); - assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE))); context.restore(windowName, changeLog); - assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE))); + assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE))); + assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE))); // check segment directories windowStore.flush(); @@ -621,17 +673,19 @@ public class RocksDBWindowStoreTest { @SuppressWarnings("unchecked") @Test public void shouldFetchAndIterateOverExactKeys() throws Exception { + final long windowSize = 0x7a00000000000000L; + final long retentionPeriod = 0x7a00000000000000L; final RocksDBWindowStoreSupplier<String, String> supplier = new RocksDBWindowStoreSupplier<>( - "window", - 0x7a00000000000000L, 2, - true, - Serdes.String(), - Serdes.String(), - 0x7a00000000000000L, - true, - Collections.<String, String>emptyMap(), - false); + "window", + retentionPeriod, 2, + true, + Serdes.String(), + Serdes.String(), + windowSize, + true, + Collections.<String, String>emptyMap(), + false); windowStore = supplier.get(); windowStore.init(context, windowStore); @@ -645,6 +699,19 @@ public class RocksDBWindowStoreTest { final List expected = Utils.mkList("0001", "0003", "0005"); assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected)); + + List<KeyValue<Windowed<String>, String>> list = StreamsTestUtils.toList(windowStore.fetch("a", "a", 0, Long.MAX_VALUE)); + assertThat(list, equalTo(Utils.mkList( + windowedPair("a", "0001", 0, windowSize), + windowedPair("a", "0003", 1, windowSize), + windowedPair("a", "0005", 0x7a00000000000000L - 1, windowSize) + ))); + + list = StreamsTestUtils.toList(windowStore.fetch("aa", "aa", 0, Long.MAX_VALUE)); + assertThat(list, equalTo(Utils.mkList( + windowedPair("aa", "0002", 0, windowSize), + windowedPair("aa", "0004", 1, windowSize) + ))); } @SuppressWarnings("unchecked") @@ -746,4 +813,12 @@ public class RocksDBWindowStoreTest { return entriesByKey; } + + private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) { + return windowedPair(key, value, timestamp, WINDOW_SIZE); + } + + private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) { + return KeyValue.pair(new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), value); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java new file mode 100644 index 0000000..7a26660 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +public class SegmentedCacheFunctionTest { + + private static final int SEGMENT_INTERVAL = 17; + private static final int TIMESTAMP = 736213517; + + private static final Bytes THE_KEY = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42); + private final static Bytes THE_CACHE_KEY = Bytes.wrap( + ByteBuffer.allocate(8 + THE_KEY.get().length) + .putLong(TIMESTAMP / SEGMENT_INTERVAL) + .put(THE_KEY.get()).array() + ); + + private final SegmentedCacheFunction cacheFunction = new SegmentedCacheFunction(new WindowKeySchema(), SEGMENT_INTERVAL); + + @Test + public void key() throws Exception { + assertThat( + cacheFunction.key(THE_CACHE_KEY), + equalTo(THE_KEY) + ); + } + + @Test + public void cacheKey() throws Exception { + final long segmentId = TIMESTAMP / SEGMENT_INTERVAL; + + final Bytes actualCacheKey = cacheFunction.cacheKey(THE_KEY); + final ByteBuffer buffer = ByteBuffer.wrap(actualCacheKey.get()); + + assertThat(buffer.getLong(), equalTo(segmentId)); + + byte[] actualKey = new byte[buffer.remaining()]; + buffer.get(actualKey); + assertThat(Bytes.wrap(actualKey), equalTo(THE_KEY)); + } + + @Test + public void testRoundTripping() throws Exception { + assertThat( + cacheFunction.key(cacheFunction.cacheKey(THE_KEY)), + equalTo(THE_KEY) + ); + + assertThat( + cacheFunction.cacheKey(cacheFunction.key(THE_CACHE_KEY)), + equalTo(THE_CACHE_KEY) + ); + } + + @Test + public void compareSegmentedKeys() throws Exception { + assertThat( + "same key in same segment should be ranked the same", + cacheFunction.compareSegmentedKeys( + cacheFunction.cacheKey(THE_KEY), + THE_KEY + ) == 0 + ); + + final Bytes sameKeyInPriorSegment = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 1234, 42); + + assertThat( + "same keys in different segments should be ordered according to segment", + cacheFunction.compareSegmentedKeys( + cacheFunction.cacheKey(sameKeyInPriorSegment), + THE_KEY + ) < 0 + ); + + assertThat( + "same keys in different segments should be ordered according to segment", + cacheFunction.compareSegmentedKeys( + cacheFunction.cacheKey(THE_KEY), + sameKeyInPriorSegment + ) > 0 + ); + + final Bytes lowerKeyInSameSegment = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0); + + assertThat( + "different keys in same segments should be ordered according to key", + cacheFunction.compareSegmentedKeys( + cacheFunction.cacheKey(THE_KEY), + lowerKeyInSameSegment + ) > 0 + ); + + assertThat( + "different keys in same segments should be ordered according to key", + cacheFunction.compareSegmentedKeys( + cacheFunction.cacheKey(lowerKeyInSameSegment), + THE_KEY + ) < 0 + ); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java index 354cf01..4682174 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -52,17 +52,103 @@ public class SessionKeySchemaTest { @Test public void shouldFetchExactKeysSkippingLongerKeys() throws Exception { - final List<Integer> result = getValues(sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0}), 0, Long.MAX_VALUE)); + final Bytes key = Bytes.wrap(new byte[]{0}); + final List<Integer> result = getValues(sessionKeySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE)); assertThat(result, equalTo(Arrays.asList(2, 4))); } @Test public void shouldFetchExactKeySkippingShorterKeys() throws Exception { - final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0, 0}), 0, Long.MAX_VALUE); + final Bytes key = Bytes.wrap(new byte[]{0, 0}); + final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE); final List<Integer> results = getValues(hasNextCondition); assertThat(results, equalTo(Arrays.asList(1, 5))); } + @Test + public void testUpperBoundWithLargeTimestamps() throws Exception { + Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE); + + assertThat( + "shorter key with max timestamp should be in range", + upper.compareTo( + SessionKeySerde.bytesToBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + ) + ) >= 0 + ); + + assertThat( + "shorter key with max timestamp should be in range", + upper.compareTo( + SessionKeySerde.bytesToBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, 0xB}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + ) + ) >= 0 + ); + + assertThat(upper, equalTo(SessionKeySerde.bytesToBinary( + new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) + ); + } + + @Test + public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() throws Exception { + Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), Long.MAX_VALUE); + + assertThat( + "shorter key with max timestamp should be in range", + upper.compareTo( + SessionKeySerde.bytesToBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, (byte) 0x8F}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + ) + ) >= 0 + ); + + assertThat(upper, equalTo(SessionKeySerde.bytesToBinary( + new Windowed<>(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) + ); + } + + @Test + public void testUpperBoundWithZeroTimestamp() throws Exception { + Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); + + assertThat(upper, equalTo(SessionKeySerde.bytesToBinary( + new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))) + ); + } + + @Test + public void testLowerBoundWithZeroTimestamp() throws Exception { + Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); + assertThat(lower, equalTo(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); + } + + @Test + public void testLowerBoundMatchesTrailingZeros() throws Exception { + Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE); + + assertThat( + "appending zeros to key should still be in range", + lower.compareTo( + SessionKeySerde.bytesToBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + ) + ) < 0 + ); + + assertThat(lower, equalTo(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); + } + private List<Integer> getValues(final HasNextCondition hasNextCondition) { final List<Integer> results = new ArrayList<>(); @@ -72,4 +158,4 @@ public class SessionKeySchemaTest { return results; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java new file mode 100644 index 0000000..4a73e74 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +public class WindowKeySchemaTest { + + private final WindowKeySchema windowKeySchema = new WindowKeySchema(); + + @Test + public void testUpperBoundWithLargeTimestamps() throws Exception { + Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE); + + assertThat( + "shorter key with max timestamp should be in range", + upper.compareTo( + WindowStoreUtils.toBinaryKey( + new byte[]{0xA}, + Long.MAX_VALUE, + Integer.MAX_VALUE + ) + ) >= 0 + ); + + assertThat( + "shorter key with max timestamp should be in range", + upper.compareTo( + WindowStoreUtils.toBinaryKey( + new byte[]{0xA, 0xB}, + Long.MAX_VALUE, + Integer.MAX_VALUE + ) + ) >= 0 + ); + + assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA}, Long.MAX_VALUE, Integer.MAX_VALUE))); + } + + @Test + public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() throws Exception { + Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), Long.MAX_VALUE); + + assertThat( + "shorter key with max timestamp should be in range", + upper.compareTo( + WindowStoreUtils.toBinaryKey( + new byte[]{0xA, (byte) 0x8F}, + Long.MAX_VALUE, + Integer.MAX_VALUE + ) + ) >= 0 + ); + + assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}, Long.MAX_VALUE, Integer.MAX_VALUE))); + } + + + @Test + public void testUpperBoundWithKeyBytesLargerAndSmallerThanFirstTimestampByte() throws Exception { + Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xC, 0xC, 0x9}), 0x0AffffffffffffffL); + + assertThat( + "shorter key with max timestamp should be in range", + upper.compareTo( + WindowStoreUtils.toBinaryKey( + new byte[]{0xC, 0xC}, + 0x0AffffffffffffffL, + Integer.MAX_VALUE + ) + ) >= 0 + ); + + assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xC, 0xC}, 0x0AffffffffffffffL, Integer.MAX_VALUE))); + } + + @Test + public void testUpperBoundWithZeroTimestamp() throws Exception { + Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); + assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, Integer.MAX_VALUE))); + } + + @Test + public void testLowerBoundWithZeroTimestamp() throws Exception { + Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); + assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0))); + } + + @Test + public void testLowerBoundWithMonZeroTimestamp() throws Exception { + Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 42); + assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0))); + } + + @Test + public void testLowerBoundMatchesTrailingZeros() throws Exception { + Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE - 1); + + assertThat( + "appending zeros to key should still be in range", + lower.compareTo( + WindowStoreUtils.toBinaryKey( + new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Long.MAX_VALUE - 1, + 0 + ) + ) < 0 + ); + + assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0))); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index f7fd53b..ed408e6 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -26,12 +26,13 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V>, StateStore { - private Map<K, List<KeyValue<Windowed<K>, V>>> sessions = new HashMap<>(); + private NavigableMap<K, List<KeyValue<Windowed<K>, V>>> sessions = new TreeMap<>(); private boolean open = true; public void put(final Windowed<K> sessionKey, final V value) { @@ -53,6 +54,44 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V } @Override + public KeyValueIterator<Windowed<K>, V> fetch(K from, K to) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (!sessions.subMap(from, to).isEmpty()) { + return new KeyValueIteratorStub<>(Collections.<KeyValue<Windowed<K>, V>>emptyIterator()); + } + final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = sessions.subMap(from, to).values().iterator(); + return new KeyValueIteratorStub<>( + new Iterator<KeyValue<Windowed<K>, V>>() { + + Iterator<KeyValue<Windowed<K>, V>> it; + + @Override + public boolean hasNext() { + while (it == null || !it.hasNext()) { + if (!keysIterator.hasNext()) { + return false; + } + it = keysIterator.next().iterator(); + } + return true; + } + + @Override + public KeyValue<Windowed<K>, V> next() { + return it.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + ); + } + + @Override public String name() { return ""; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java index 6ab70ae..7f8457c 100644 --- a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java @@ -50,6 +50,11 @@ public class SegmentedBytesStoreStub implements SegmentedBytesStore { @Override public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) { + return fetch(key, key, from, to); + } + + @Override + public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) { fetchCalled = true; return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator()); }
