http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java new file mode 100644 index 0000000..c107c3e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.Merger; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@SuppressWarnings("unchecked") +public class KStreamSessionWindowAggregateProcessorTest { + + private static final long GAP_MS = 5 * 60 * 1000L; + private static final String STORE_NAME = "session-store"; + + private final Initializer<Long> initializer = new Initializer<Long>() { + @Override + public Long apply() { + return 0L; + } + }; + private final Aggregator<String, String, Long> aggregator = new Aggregator<String, String, Long>() { + @Override + public Long apply(final String aggKey, final String value, final Long aggregate) { + return aggregate + 1; + } + }; + private final Merger<String, Long> sessionMerger = new Merger<String, Long>() { + @Override + public Long apply(final String aggKey, final Long aggOne, final Long aggTwo) { + return aggOne + aggTwo; + } + }; + private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = + new KStreamSessionWindowAggregate<>(SessionWindows.with(GAP_MS).until(3 * GAP_MS), + STORE_NAME, + initializer, + aggregator, + sessionMerger); + + private final List<KeyValue> results = new ArrayList<>(); + private Processor<String, String> processor = sessionAggregator.get(); + private SessionStore<String, Long> sessionStore; + private MockProcessorContext context; + + + @SuppressWarnings("unchecked") + @Before + public void initializeStore() { + final File stateDir = TestUtils.tempDirectory(); + context = new MockProcessorContext(new KStreamTestDriver(new KStreamBuilder(), stateDir), stateDir, Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(100000)) { + @Override + public <K, V> void forward(final K key, final V value) { + results.add(KeyValue.pair(key, value)); + } + }; + + initStore(true); + processor.init(context); + } + + private void initStore(final boolean enableCaching) { + final RocksDBSessionStoreSupplier<String, Long> supplier = + new RocksDBSessionStoreSupplier<>(STORE_NAME, + GAP_MS * 3, + Serdes.String(), + Serdes.Long(), + false, + Collections.<String, String>emptyMap(), + enableCaching); + sessionStore = (SessionStore<String, Long>) supplier.get(); + sessionStore.init(context, sessionStore); + } + + @After + public void closeStore() { + sessionStore.close(); + } + + @Test + public void shouldCreateSingleSessionWhenWithinGap() throws Exception { + context.setTime(0); + processor.process("john", "first"); + context.setTime(500); + processor.process("john", "second"); + + final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge("john", 0, 2000); + assertTrue(values.hasNext()); + assertEquals(Long.valueOf(2), values.next().value); + } + + + @Test + public void shouldMergeSessions() throws Exception { + context.setTime(0); + final String sessionId = "mel"; + processor.process(sessionId, "first"); + assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext()); + + // move time beyond gap + context.setTime(GAP_MS + 1); + processor.process(sessionId, "second"); + assertTrue(sessionStore.findSessionsToMerge(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext()); + // should still exist as not within gap + assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext()); + // move time back + context.setTime(GAP_MS / 2); + processor.process(sessionId, "third"); + + final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge(sessionId, 0, GAP_MS + 1); + final KeyValue<Windowed<String>, Long> kv = iterator.next(); + + assertEquals(Long.valueOf(3), kv.value); + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldUpdateSessionIfTheSameTime() throws Exception { + context.setTime(0); + processor.process("mel", "first"); + processor.process("mel", "second"); + final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge("mel", 0, 0); + assertEquals(Long.valueOf(2L), iterator.next().value); + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() throws Exception { + final String sessionId = "mel"; + long time = 0; + context.setTime(time); + processor.process(sessionId, "first"); + context.setTime(time += GAP_MS + 1); + processor.process(sessionId, "second"); + processor.process(sessionId, "second"); + context.setTime(time += GAP_MS + 1); + processor.process(sessionId, "third"); + processor.process(sessionId, "third"); + processor.process(sessionId, "third"); + + sessionStore.flush(); + assertEquals(Arrays.asList( + KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(time, time)), new Change<>(3L, null)) + + ), results); + + } + + + @Test + public void shouldRemoveMergedSessionsFromStateStore() throws Exception { + context.setTime(0); + processor.process("a", "1"); + + // first ensure it is in the store + final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessionsToMerge("a", 0, 0); + assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a1.next()); + + context.setTime(100); + processor.process("a", "2"); + // a1 from above should have been removed + // should have merged session in store + final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessionsToMerge("a", 0, 100); + assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 100)), 2L), a2.next()); + assertFalse(a2.hasNext()); + } + + @Test + public void shouldHandleMultipleSessionsAndMerging() throws Exception { + context.setTime(0); + processor.process("a", "1"); + processor.process("b", "1"); + processor.process("c", "1"); + processor.process("d", "1"); + context.setTime(GAP_MS / 2); + processor.process("d", "2"); + context.setTime(GAP_MS + 1); + processor.process("a", "2"); + processor.process("b", "2"); + context.setTime(GAP_MS + 1 + GAP_MS / 2); + processor.process("a", "3"); + processor.process("c", "3"); + + sessionStore.flush(); + + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("d", new TimeWindow(0, GAP_MS / 2)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>("b", new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>("c", new TimeWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null)) + ), + results); + } + + + @Test + public void shouldGetAggregatedValuesFromValueGetter() throws Exception { + final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get(); + getter.init(context); + context.setTime(0); + processor.process("a", "1"); + context.setTime(GAP_MS + 1); + processor.process("a", "1"); + processor.process("a", "2"); + final long t0 = getter.get(new Windowed<>("a", new TimeWindow(0, 0))); + final long t1 = getter.get(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1))); + assertEquals(1L, t0); + assertEquals(2L, t1); + } + + @Test + public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() throws Exception { + initStore(false); + processor.init(context); + + context.setTime(0); + processor.process("a", "1"); + processor.process("b", "1"); + processor.process("c", "1"); + + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null))), results); + } + + @Test + public void shouldImmediatelyForwardRemovedSessionsWhenMerging() throws Exception { + initStore(false); + processor.init(context); + + context.setTime(0); + processor.process("a", "1"); + context.setTime(5); + processor.process("a", "1"); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(null, null)), + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 5)), new Change<>(2L, null))), results); + + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java new file mode 100644 index 0000000..2f5972c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class SessionKeySerdeTest { + + @Test + public void shouldSerializeDeserialize() throws Exception { + final Windowed<Long> key = new Windowed<>(1L, new TimeWindow(10, 100)); + final SessionKeySerde<Long> serde = new SessionKeySerde<>(Serdes.Long()); + final byte[] bytes = serde.serializer().serialize("t", key); + final Windowed<Long> result = serde.deserializer().deserialize("t", bytes); + assertEquals(key, result); + } + + @Test + public void shouldSerializeNullToNull() throws Exception { + final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String()); + assertNull(serde.serializer().serialize("t", null)); + } + + @Test + public void shouldDeSerializeEmtpyByteArrayToNull() throws Exception { + final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String()); + assertNull(serde.deserializer().deserialize("t", new byte[0])); + } + + @Test + public void shouldDeSerializeNullToNull() throws Exception { + final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String()); + assertNull(serde.deserializer().deserialize("t", null)); + } + + @Test + public void shouldConvertToBinaryAndBack() throws Exception { + final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 20)); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); + final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer()); + assertEquals(key, result); + } + + @Test + public void shouldExtractEndTimeFromBinary() throws Exception { + final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 100)); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); + assertEquals(100, SessionKeySerde.extractEnd(serialized.get())); + } + + @Test + public void shouldExtractStartTimeFromBinary() throws Exception { + final Windowed<String> key = new Windowed<>("key", new TimeWindow(50, 100)); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); + assertEquals(50, SessionKeySerde.extractStart(serialized.get())); + } + + @Test + public void shouldExtractKeyBytesFromBinary() throws Exception { + final Windowed<String> key = new Windowed<>("blah", new TimeWindow(50, 100)); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); + assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get())); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java new file mode 100644 index 0000000..cb6f87e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionKeySerde; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +public class CachingSessionStoreTest { + + private static final int MAX_CACHE_SIZE_BYTES = 600; + private final StateSerdes<String, Long> serdes = + new StateSerdes<>("name", Serdes.String(), Serdes.Long()); + private RocksDBSegmentedBytesStore underlying; + private CachingSessionStore<String, Long> cachingStore; + private ThreadCache cache; + private static final Long DEFAULT_TIMESTAMP = 10L; + + @Before + public void setUp() throws Exception { + underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, new SessionKeySchema()); + cachingStore = new CachingSessionStore<>(underlying, + Serdes.String(), + Serdes.Long()); + cache = new ThreadCache(MAX_CACHE_SIZE_BYTES); + final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); + context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); + cachingStore.init(context, cachingStore); + } + + @Test + public void shouldPutFetchFromCache() throws Exception { + cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L); + + final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessionsToMerge("a", 0, 0); + final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessionsToMerge("b", 0, 0); + + assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a.next()); + assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 1L), b.next()); + assertFalse(a.hasNext()); + assertFalse(b.hasNext()); + assertEquals(3, cache.size()); + } + + @Test + public void shouldFetchAllSessionsWithSameRecordKey() throws Exception { + + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L)); + for (KeyValue<Windowed<String>, Long> kv : expected) { + cachingStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L); + + + final List<KeyValue<Windowed<String>, Long>> results = toList(cachingStore.fetch("a")); + assertEquals(expected, results); + + } + + @Test + public void shouldFlushItemsToStoreOnEviction() throws Exception { + final List<KeyValue<Windowed<String>, Long>> added = addSessionsUntilOverflow("a", "b", "c", "d"); + assertEquals(added.size() - 1, cache.size()); + final KeyValueIterator<Bytes, byte[]> iterator = underlying.fetch(Bytes.wrap(added.get(0).key.key().getBytes()), 0, 0); + final KeyValue<Bytes, byte[]> next = iterator.next(); + assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer())); + assertArrayEquals(serdes.rawValue(added.get(0).value), next.value); + } + + @Test + public void shouldQueryItemsInCacheAndStore() throws Exception { + final List<KeyValue<Windowed<String>, Long>> added = addSessionsUntilOverflow("a"); + final KeyValueIterator<Windowed<String>, Long> iterator = cachingStore.findSessionsToMerge("a", 0, added.size() * 10); + final List<KeyValue<Windowed<String>, Long>> actual = toList(iterator); + assertEquals(added, actual); + } + + @Test + public void shouldRemove() throws Exception { + final Windowed<String> a = new Windowed<>("a", new TimeWindow(0, 0)); + final Windowed<String> b = new Windowed<>("b", new TimeWindow(0, 0)); + cachingStore.put(a, 2L); + cachingStore.put(b, 2L); + cachingStore.flush(); + cachingStore.remove(a); + cachingStore.flush(); + final KeyValueIterator<Windowed<String>, Long> rangeIter = cachingStore.findSessionsToMerge("a", 0, 0); + assertFalse(rangeIter.hasNext()); + } + + @Test + public void shouldFetchCorrectlyAcrossSegments() throws Exception { + final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0)); + final Windowed<String> a2 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL)); + final Windowed<String> a3 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2)); + cachingStore.put(a1, 1L); + cachingStore.put(a2, 2L); + cachingStore.put(a3, 3L); + cachingStore.flush(); + final KeyValueIterator<Windowed<String>, Long> results = cachingStore.findSessionsToMerge("a", 0, Segments.MIN_SEGMENT_INTERVAL * 2); + assertEquals(a1, results.next().key); + assertEquals(a2, results.next().key); + assertEquals(a3, results.next().key); + assertFalse(results.hasNext()); + } + + @Test + public void shouldClearNamespaceCacheOnClose() throws Exception { + final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0)); + cachingStore.put(a1, 1L); + assertEquals(1, cache.size()); + cachingStore.close(); + assertEquals(0, cache.size()); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception { + cachingStore.close(); + cachingStore.fetch("a"); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws Exception { + cachingStore.close(); + cachingStore.findSessionsToMerge("a", 0, Long.MAX_VALUE); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToRemoveFromClosedCachingStore() throws Exception { + cachingStore.close(); + cachingStore.remove(new Windowed<>("a", new TimeWindow(0, 0))); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToPutIntoClosedCachingStore() throws Exception { + cachingStore.close(); + cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); + } + + private List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow(final String...sessionIds) { + final Random random = new Random(); + final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>(); + while (cache.size() == results.size()) { + final String sessionId = sessionIds[random.nextInt(sessionIds.length)]; + addSingleSession(sessionId, results); + } + return results; + } + + private void addSingleSession(final String sessionId, final List<KeyValue<Windowed<String>, Long>> allSessions) { + final int timestamp = allSessions.size() * 10; + final Windowed<String> key = new Windowed<>(sessionId, new TimeWindow(timestamp, timestamp)); + final Long value = 1L; + cachingStore.put(key, value); + allSessions.add(KeyValue.pair(key, value)); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 023fea6..427798d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.TestUtils; @@ -44,16 +45,18 @@ public class CachingWindowStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 150; private static final Long WINDOW_SIZE = 10000L; - private RocksDBWindowStore<Bytes, byte[]> underlying; + private RocksDBSegmentedBytesStore underlying; private CachingWindowStore<String, String> cachingStore; private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener; private ThreadCache cache; private String topic; - private static final Long DEFAULT_TIMESTAMP = 10L; + private static final long DEFAULT_TIMESTAMP = 10L; + private WindowStoreKeySchema keySchema; @Before public void setUp() throws Exception { - underlying = new RocksDBWindowStore<>("test", 30000, 3, false, Serdes.Bytes(), Serdes.ByteArray()); + keySchema = new WindowStoreKeySchema(); + underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema); cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(); cachingStore = new CachingWindowStore<>(underlying, Serdes.String(), @@ -85,9 +88,9 @@ public class CachingWindowStoreTest { public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception { int added = addItemsToCache(); // all dirty entries should have been flushed - final WindowStoreIterator<byte[]> iter = underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP); - final KeyValue<Long, byte[]> next = iter.next(); - assertEquals(DEFAULT_TIMESTAMP, next.key); + final KeyValueIterator<Bytes, byte[]> iter = underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP); + final KeyValue<Bytes, byte[]> next = iter.next(); + assertEquals(DEFAULT_TIMESTAMP, keySchema.segmentTimestamp(next.key)); assertArrayEquals("0".getBytes(), next.value); assertFalse(iter.hasNext()); assertEquals(added - 1, cache.size()); @@ -143,7 +146,8 @@ public class CachingWindowStoreTest { @Test public void shouldIterateCacheAndStore() throws Exception { - underlying.put(Bytes.wrap("1".getBytes()), "a".getBytes()); + final Bytes key = Bytes.wrap("1" .getBytes()); + underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES)), "a".getBytes()); cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE); final WindowStoreIterator<String> fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE); assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next()); http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java new file mode 100644 index 0000000..d4f9e47 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.SegmentedBytesStoreStub; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ChangeLoggingSegmentedBytesStoreTest { + + private final SegmentedBytesStoreStub bytesStore = new SegmentedBytesStoreStub(); + private final ChangeLoggingSegmentedBytesStore store = new ChangeLoggingSegmentedBytesStore(bytesStore); + private final Map sent = new HashMap<>(); + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + sent.put(record.key(), record.value()); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + collector, + new ThreadCache(0)); + context.setTime(0); + store.init(context, store); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldLogPuts() throws Exception { + final byte[] value1 = {0}; + final byte[] value2 = {1}; + final Bytes key1 = Bytes.wrap(value1); + final Bytes key2 = Bytes.wrap(value2); + store.put(key1, value1); + store.put(key2, value2); + store.flush(); + assertArrayEquals(value1, (byte[]) sent.get(key1)); + assertArrayEquals(value2, (byte[]) sent.get(key2)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldLogRemoves() throws Exception { + final Bytes key1 = Bytes.wrap(new byte[]{0}); + final Bytes key2 = Bytes.wrap(new byte[]{1}); + store.remove(key1); + store.remove(key2); + store.flush(); + assertTrue(sent.containsKey(key1)); + assertTrue(sent.containsKey(key2)); + assertNull(sent.get(key1)); + assertNull(sent.get(key2)); + } + + @Test + public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception { + store.fetch(Bytes.wrap(new byte[0]), 1, 1); + assertTrue(bytesStore.fetchCalled); + } + + @Test + public void shouldFlushUnderlyingStore() throws Exception { + store.flush(); + assertTrue(bytesStore.flushed); + } + + @Test + public void shouldCloseUnderlyingStore() throws Exception { + store.close(); + assertTrue(bytesStore.closed); + } + + @Test + public void shouldInitUnderlyingStore() throws Exception { + assertTrue(bytesStore.initialized); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java new file mode 100644 index 0000000..fc4a4c5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.test.ReadOnlySessionStoreStub; +import org.apache.kafka.test.StateStoreProviderStub; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest.toList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class CompositeReadOnlySessionStoreTest { + + private final String storeName = "session-store"; + private final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false); + private final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); + private final ReadOnlySessionStoreStub<String, Long> underlyingSessionStore = new ReadOnlySessionStoreStub<>(); + private final ReadOnlySessionStoreStub<String, Long> otherUnderlyingStore = new ReadOnlySessionStoreStub<>(); + private CompositeReadOnlySessionStore<String, Long> sessionStore; + + @Before + public void before() { + stubProviderOne.addStore(storeName, underlyingSessionStore); + stubProviderOne.addStore("other-session-store", otherUnderlyingStore); + + + sessionStore = new CompositeReadOnlySessionStore<>( + new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)), + QueryableStoreTypes.<String, Long>sessionStore(), storeName); + } + + @Test + public void shouldFetchResulstFromUnderlyingSessionStore() throws Exception { + underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); + underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(10, 10)), 2L); + + final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a")); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L)), + results); + } + + @Test + public void shouldReturnEmptyIteratorIfNoData() throws Exception { + final KeyValueIterator<Windowed<String>, Long> result = sessionStore.fetch("b"); + assertFalse(result.hasNext()); + } + + @Test + public void shouldFindValueForKeyWhenMultiStores() throws Exception { + final ReadOnlySessionStoreStub<String, Long> secondUnderlying = new + ReadOnlySessionStoreStub<>(); + stubProviderTwo.addStore(storeName, secondUnderlying); + + final Windowed<String> keyOne = new Windowed<>("key-one", new TimeWindow(0, 0)); + final Windowed<String> keyTwo = new Windowed<>("key-two", new TimeWindow(0, 0)); + underlyingSessionStore.put(keyOne, 0L); + secondUnderlying.put(keyTwo, 10L); + + final List<KeyValue<Windowed<String>, Long>> keyOneResults = toList(sessionStore.fetch("key-one")); + final List<KeyValue<Windowed<String>, Long>> keyTwoResults = toList(sessionStore.fetch("key-two")); + + assertEquals(Collections.singletonList(KeyValue.pair(keyOne, 0L)), keyOneResults); + assertEquals(Collections.singletonList(KeyValue.pair(keyTwo, 10L)), keyTwoResults); + } + + @Test + public void shouldNotGetValueFromOtherStores() throws Exception { + final Windowed<String> expectedKey = new Windowed<>("foo", new TimeWindow(0, 0)); + otherUnderlyingStore.put(new Windowed<>("foo", new TimeWindow(10, 10)), 10L); + underlyingSessionStore.put(expectedKey, 1L); + + final KeyValueIterator<Windowed<String>, Long> result = sessionStore.fetch("foo"); + assertEquals(KeyValue.pair(expectedKey, 1L), result.next()); + assertFalse(result.hasNext()); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowInvalidStateStoreExceptionOnRebalance() throws Exception { + final CompositeReadOnlySessionStore<String, String> store + = new CompositeReadOnlySessionStore<>(new StateStoreProviderStub(true), + QueryableStoreTypes.<String, String>sessionStore(), + "whateva"); + + store.fetch("a"); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() throws Exception { + underlyingSessionStore.setOpen(false); + underlyingSessionStore.fetch("key"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java index 4a32187..bca4837 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java @@ -27,17 +27,18 @@ import static org.junit.Assert.assertTrue; public class DelegatingPeekingKeyValueIteratorTest { + private final String name = "name"; private InMemoryKeyValueStore<String, String> store; @Before public void setUp() throws Exception { - store = new InMemoryKeyValueStore<>("name"); + store = new InMemoryKeyValueStore<>(name); } @Test public void shouldPeekNext() throws Exception { store.put("A", "A"); - final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all()); + final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); assertEquals("A", peekingIterator.peekNextKey()); assertEquals("A", peekingIterator.peekNextKey()); assertTrue(peekingIterator.hasNext()); @@ -50,7 +51,7 @@ public class DelegatingPeekingKeyValueIteratorTest { store.put(kv, kv); } - final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all()); + final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); int index = 0; while (peekingIterator.hasNext()) { final String peekNext = peekingIterator.peekNextKey(); @@ -64,13 +65,13 @@ public class DelegatingPeekingKeyValueIteratorTest { @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() throws Exception { - final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all()); + final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); peekingIterator.next(); } @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() throws Exception { - final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all()); + final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); peekingIterator.peekNextKey(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java deleted file mode 100644 index 3f251b3..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.MockProcessorContext; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class DelegatingPeekingWindowIteratorTest { - - private static final long DEFAULT_TIMESTAMP = 0L; - private WindowStore<String, String> store; - - @Before - public void setUp() throws Exception { - store = new RocksDBWindowStore<>("test", 30000, 3, false, Serdes.String(), Serdes.String()); - final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, null); - context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); - store.init(context, store); - } - - @Test - public void shouldPeekNext() throws Exception { - final KeyValue<Long, String> expected = KeyValue.pair(DEFAULT_TIMESTAMP, "A"); - store.put("A", "A"); - final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("A", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP)); - assertEquals(expected, peekingIterator.peekNext()); - assertEquals(expected, peekingIterator.peekNext()); - assertTrue(peekingIterator.hasNext()); - } - - @Test - public void shouldPeekAndIterate() throws Exception { - final List<KeyValue<Long, String>> expected = new ArrayList<>(); - for (long t = 0; t < 50; t += 10) { - store.put("a", String.valueOf(t), t); - expected.add(KeyValue.pair(t, String.valueOf(t))); - } - final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("a", 0, 50)); - int index = 0; - while (peekingIterator.hasNext()) { - final KeyValue<Long, String> peekNext = peekingIterator.peekNext(); - final KeyValue<Long, String> key = peekingIterator.next(); - assertEquals(expected.get(index), peekNext); - assertEquals(expected.get(index), key); - index++; - } - assertEquals(expected.size(), index); - } - - @Test(expected = NoSuchElementException.class) - public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() throws Exception { - final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("b", 10, 10)); - peekingIterator.next(); - } - - @Test(expected = NoSuchElementException.class) - public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() throws Exception { - final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("b", 10, 10)); - peekingIterator.peekNext(); - } - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index c7a1a2c..6bb27b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -103,12 +103,12 @@ class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> { @Override public KeyValueIterator<K, V> range(final K from, final K to) { - return new TheIterator(this.map.subMap(from, true, to, false).entrySet().iterator()); + return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(this.map.subMap(from, true, to, false).entrySet().iterator())); } @Override public KeyValueIterator<K, V> all() { - return new TheIterator(map.entrySet().iterator()); + return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(map.entrySet().iterator())); } private class TheIterator implements KeyValueIterator<K, V> { @@ -125,6 +125,11 @@ class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> { } @Override + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey not supported"); + } + + @Override public boolean hasNext() { return underlying.hasNext(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java index dee2593..db391ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.junit.Before; @@ -50,7 +51,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { final Bytes from = Bytes.wrap(new byte[]{2}); final Bytes to = Bytes.wrap(new byte[]{9}); - final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.range(from, to)); + final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get()); final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes); @@ -138,9 +139,40 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { } + @Test + public void shouldPeekNextKey() throws Exception { + final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one"); + final ThreadCache cache = new ThreadCache(1000000L); + byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; + final String namespace = "one"; + for (int i = 0; i < bytes.length - 1; i += 2) { + kv.put(Bytes.wrap(bytes[i]), bytes[i]); + cache.put(namespace, bytes[i + 1], new LRUCacheEntry(bytes[i + 1])); + } + + final Bytes from = Bytes.wrap(new byte[]{2}); + final Bytes to = Bytes.wrap(new byte[]{9}); + final KeyValueIterator<Bytes, byte[]> storeIterator = kv.range(from, to); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get()); + + final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = + new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, + storeIterator, + serdes); + final byte[][] values = new byte[8][]; + int index = 0; + int bytesIndex = 2; + while (iterator.hasNext()) { + final byte[] keys = iterator.peekNextKey(); + values[index++] = keys; + assertArrayEquals(bytes[bytesIndex++], keys); + iterator.next(); + } + } + private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() { final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(namespace); - final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.all()); + final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.all()); return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java index 9ee8b29..c33f174 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java @@ -18,13 +18,14 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; -import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.test.KeyValueIteratorStub; import org.junit.Test; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import static org.junit.Assert.assertArrayEquals; @@ -34,16 +35,17 @@ public class MergedSortedCacheWindowStoreIteratorTest { @Test public void shouldIterateOverValueFromBothIterators() throws Exception { - final List<KeyValue<Long, byte[]>> storeValues = new ArrayList<>(); + final List<KeyValue<Bytes, byte[]>> storeValues = new ArrayList<>(); final ThreadCache cache = new ThreadCache(1000000L); final String namespace = "one"; final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String()); final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>(); for (long t = 0; t < 100; t += 20) { - final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, String.valueOf(t).getBytes()); + final byte[] v1Bytes = String.valueOf(t).getBytes(); + final KeyValue<Bytes, byte[]> v1 = KeyValue.pair(Bytes.wrap(WindowStoreUtils.toBinaryKey("a", t, 0, stateSerdes)), v1Bytes); storeValues.add(v1); - expectedKvPairs.add(v1); + expectedKvPairs.add(KeyValue.pair(t, v1Bytes)); final byte[] keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes); final byte[] valBytes = String.valueOf(t + 10).getBytes(); expectedKvPairs.add(KeyValue.pair(t + 10, valBytes)); @@ -52,11 +54,11 @@ public class MergedSortedCacheWindowStoreIteratorTest { byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); - final PeekingWindowIterator<byte[]> storeIterator = new DelegatingPeekingWindowIterator<>(new WindowStoreIteratorStub(storeValues.iterator())); + final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("name", new KeyValueIteratorStub<>(storeValues.iterator())); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, binaryFrom, binaryTo); - final MergedSortedCachedWindowStoreIterator<byte[], byte[]> iterator = new MergedSortedCachedWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.ByteArray(), Serdes.ByteArray())); + final MergedSortedCachedWindowStoreIterator<Bytes, byte[]> iterator = new MergedSortedCachedWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Bytes(), Serdes.ByteArray())); int index = 0; while (iterator.hasNext()) { final KeyValue<Long, byte[]> next = iterator.next(); @@ -66,32 +68,4 @@ public class MergedSortedCacheWindowStoreIteratorTest { } } - private static class WindowStoreIteratorStub implements WindowStoreIterator<byte[]> { - - private final Iterator<KeyValue<Long, byte[]>> iterator; - - public WindowStoreIteratorStub(final Iterator<KeyValue<Long, byte[]>> iterator) { - this.iterator = iterator; - } - - @Override - public void close() { - //no-op - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public KeyValue<Long, byte[]> next() { - return iterator.next(); - } - - @Override - public void remove() { - - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java new file mode 100644 index 0000000..1587f13 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.SegmentedBytesStoreStub; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +public class MeteredSegmentedBytesStoreTest { + private final SegmentedBytesStoreStub bytesStore = new SegmentedBytesStoreStub(); + private final MeteredSegmentedBytesStore store = new MeteredSegmentedBytesStore(bytesStore, "scope", new MockTime()); + private final Set<String> latencyRecorded = new HashSet<>(); + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + final Metrics metrics = new Metrics(); + final StreamsMetrics streamsMetrics = new StreamsMetrics() { + + @Override + public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) { + return metrics.sensor(operationName); + } + + @Override + public void recordLatency(final Sensor sensor, final long startNs, final long endNs) { + latencyRecorded.add(sensor.name()); + } + }; + + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + new NoOpRecordCollector(), + new ThreadCache(0)) { + @Override + public StreamsMetrics metrics() { + return streamsMetrics; + } + }; + store.init(context, store); + } + + @Test + public void shouldRecordRestoreLatencyOnInit() throws Exception { + assertTrue(latencyRecorded.contains("restore")); + assertTrue(bytesStore.initialized); + } + + @Test + public void shouldRecordPutLatency() throws Exception { + store.put(Bytes.wrap(new byte[0]), new byte[0]); + assertTrue(latencyRecorded.contains("put")); + assertTrue(bytesStore.putCalled); + } + + @Test + public void shouldRecordFetchLatency() throws Exception { + store.fetch(Bytes.wrap(new byte[0]), 1, 1).close(); // recorded on close; + assertTrue(latencyRecorded.contains("fetch")); + assertTrue(bytesStore.fetchCalled); + } + + @Test + public void shouldRecordRemoveLatency() throws Exception { + store.remove(null); + assertTrue(latencyRecorded.contains("remove")); + assertTrue(bytesStore.removeCalled); + } + + @Test + public void shouldRecordFlushLatency() throws Exception { + store.flush(); + assertTrue(latencyRecorded.contains("flush")); + assertTrue(bytesStore.flushed); + } + + @Test + public void shouldRecordGetLatency() throws Exception { + store.get(null); + assertTrue(latencyRecorded.contains("get")); + assertTrue(bytesStore.getCalled); + } + + @Test + public void shouldCloseUnderlyingStore() throws Exception { + store.close(); + assertTrue(bytesStore.closed); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 0a782d5..99deb50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -31,6 +31,7 @@ import java.util.List; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -200,6 +201,21 @@ public class NamedCacheTest { } @Test + public void shouldRemoveDeletedValuesOnFlush() throws Exception { + cache.setListener(new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List<ThreadCache.DirtyEntry> dirty) { + // no-op + } + }); + cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, true, 0, 0, 0, "")); + cache.flush(); + assertEquals(1, cache.size()); + assertNotNull(cache.get(Bytes.wrap(new byte[]{1}))); + } + + @Test public void shouldBeReentrantAndNotBreakLRU() throws Exception { final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, ""); final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3}); @@ -259,4 +275,5 @@ public class NamedCacheTest { cache.put(key, dirty); cache.evict(); } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java new file mode 100644 index 0000000..d4c81c3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionKeySerde; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class RocksDBSegmentedBytesStoreTest { + + private final long retention = 60000L; + private final int numSegments = 3; + private final String storeName = "bytes-store"; + private RocksDBSegmentedBytesStore bytesStore; + private File stateDir; + + @Before + public void before() { + + bytesStore = new RocksDBSegmentedBytesStore(storeName, + retention, + numSegments, + new SessionKeySchema()); + + stateDir = TestUtils.tempDirectory(); + final MockProcessorContext context = new MockProcessorContext(null, + stateDir, + Serdes.String(), + Serdes.Long(), + new NoOpRecordCollector(), + new ThreadCache(0)); + bytesStore.init(context, bytesStore); + } + + @After + public void close() { + bytesStore.close(); + } + + @Test + public void shouldPutAndFetch() throws Exception { + final String key = "a"; + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(10, 10L))), serializeValue(10L)); + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(500L, 1000L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1500L, 2000L))), serializeValue(100L)); + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(2500L, 3000L))), serializeValue(200L)); + + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(10, 10)), 10L), + KeyValue.pair(new Windowed<>(key, new TimeWindow(500, 1000)), 50L)); + + final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L); + assertEquals(expected, toList(values)); + } + + @Test + public void shouldFindValuesWithinRange() throws Exception { + final String key = "a"; + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1000L, 1000L))), serializeValue(10L)); + final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L); + assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 10L)), toList(results)); + } + + @Test + public void shouldRemove() throws Exception { + bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000))), serializeValue(30L)); + bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(1500, 2500))), serializeValue(50L)); + + bytesStore.remove(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000)))); + final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L); + assertFalse(value.hasNext()); + } + + @Test + public void shouldRollSegments() throws Exception { + // just to validate directories + final Segments segments = new Segments(storeName, retention, numSegments); + final String key = "a"; + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L)); + assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); + + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(30000L, 60000L))), serializeValue(100L)); + assertEquals(Utils.mkSet(segments.segmentName(0), + segments.segmentName(1)), segmentDirs()); + + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(61000L, 120000L))), serializeValue(200L)); + assertEquals(Utils.mkSet(segments.segmentName(0), + segments.segmentName(1), + segments.segmentName(2)), segmentDirs()); + + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(121000L, 180000L))), serializeValue(300L)); + assertEquals(Utils.mkSet(segments.segmentName(1), + segments.segmentName(2), + segments.segmentName(3)), segmentDirs()); + + bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(181000L, 240000L))), serializeValue(400L)); + assertEquals(Utils.mkSet(segments.segmentName(2), + segments.segmentName(3), + segments.segmentName(4)), segmentDirs()); + + final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000)); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(61000L, 120000L)), 200L), + KeyValue.pair(new Windowed<>(key, new TimeWindow(121000L, 180000L)), 300L), + KeyValue.pair(new Windowed<>(key, new TimeWindow(181000L, 240000L)), 400L) + ), results); + + } + + private Set<String> segmentDirs() { + File windowDir = new File(stateDir, storeName); + + return new HashSet<>(Arrays.asList(windowDir.list())); + } + + private byte[] serializeValue(final long value) { + return Serdes.Long().serializer().serialize("", value); + } + + private Bytes serializeKey(final Windowed<String> key) { + return SessionKeySerde.toBinary(key, Serdes.String().serializer()); + } + + private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) { + final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>(); + while (iterator.hasNext()) { + final KeyValue<Bytes, byte[]> next = iterator.next(); + final KeyValue<Windowed<String>, Long> deserialized + = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()), Serdes.Long().deserializer().deserialize("", next.value)); + results.add(deserialized); + } + return results; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java new file mode 100644 index 0000000..11766c7 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class RocksDBSessionStoreTest { + + private SessionStore<String, Long> sessionStore; + + @Before + public void before() { + final RocksDBSegmentedBytesStore bytesStore = + new RocksDBSegmentedBytesStore("session-store", 10000L, 3, new SessionKeySchema()); + + sessionStore = new RocksDBSessionStore<>(bytesStore, + Serdes.String(), + Serdes.Long()); + + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + new NoOpRecordCollector(), + new ThreadCache(0)); + sessionStore.init(context, sessionStore); + } + + @After + public void close() { + sessionStore.close(); + } + + @Test + public void shouldPutAndFindSessionsInRange() throws Exception { + final String key = "a"; + final Windowed<String> a1 = new Windowed<>(key, new TimeWindow(10, 10L)); + final Windowed<String> a2 = new Windowed<>(key, new TimeWindow(500L, 1000L)); + sessionStore.put(a1, 1L); + sessionStore.put(a2, 2L); + sessionStore.put(new Windowed<>(key, new TimeWindow(1500L, 2000L)), 1L); + sessionStore.put(new Windowed<>(key, new TimeWindow(2500L, 3000L)), 2L); + + final List<KeyValue<Windowed<String>, Long>> expected + = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); + + final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge(key, 0, 1000L); + assertEquals(expected, toList(values)); + } + + @Test + public void shouldFetchAllSessionsWithSameRecordKey() throws Exception { + + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L)); + for (KeyValue<Windowed<String>, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + sessionStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L); + + final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a")); + assertEquals(expected, results); + + } + + + @Test + public void shouldFindValuesWithinMergingSessionWindowRange() throws Exception { + final String key = "a"; + sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L); + sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L); + final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge(key, -1, 1000L); + + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L), + KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L)); + assertEquals(expected, toList(results)); + } + + @Test + public void shouldRemove() throws Exception { + sessionStore.put(new Windowed<>("a", new TimeWindow(0, 1000)), 1L); + sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L); + + sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000))); + assertFalse(sessionStore.findSessionsToMerge("a", 0, 1000L).hasNext()); + + assertTrue(sessionStore.findSessionsToMerge("a", 1500, 2500).hasNext()); + } + + @Test + public void shouldFindSessionsToMerge() throws Exception { + final Windowed<String> session1 = new Windowed<>("a", new TimeWindow(0, 100)); + final Windowed<String> session2 = new Windowed<>("a", new TimeWindow(101, 200)); + final Windowed<String> session3 = new Windowed<>("a", new TimeWindow(201, 300)); + final Windowed<String> session4 = new Windowed<>("a", new TimeWindow(301, 400)); + final Windowed<String> session5 = new Windowed<>("a", new TimeWindow(401, 500)); + sessionStore.put(session1, 1L); + sessionStore.put(session2, 2L); + sessionStore.put(session3, 3L); + sessionStore.put(session4, 4L); + sessionStore.put(session5, 5L); + final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge("a", 150, 300); + assertEquals(session2, results.next().key); + assertEquals(session3, results.next().key); + assertFalse(results.hasNext()); + } + + static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) { + final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>(); + while (iterator.hasNext()) { + results.add(iterator.next()); + } + return results; + } + + +} \ No newline at end of file