http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 3036f79..77b92a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -38,9 +38,12 @@ import java.util.concurrent.ConcurrentHashMap; */ class Segments { private static final Logger log = LoggerFactory.getLogger(Segments.class); - static final long MIN_SEGMENT_INTERVAL = 60 * 1000L; + static long segmentInterval(long retentionPeriod, int numSegments) { + return Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); + } + private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>(); private final String name; private final int numSegments; @@ -52,7 +55,7 @@ class Segments { Segments(final String name, final long retentionPeriod, final int numSegments) { this.name = name; this.numSegments = numSegments; - this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); + this.segmentInterval = segmentInterval(retentionPeriod, numSegments); // Create a date formatter. Formatted timestamps are used as segment name suffixes this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 80785b2..6d6d9bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -23,10 +23,15 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; +import java.nio.ByteBuffer; import java.util.List; class SessionKeySchema implements SegmentedBytesStore.KeySchema { + + private static final int SUFFIX_SIZE = 2 * WindowStoreUtils.TIMESTAMP_SIZE; + private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; + private String topic; @Override @@ -35,33 +40,49 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema { } @Override - public Bytes upperRange(final Bytes key, final long to) { + public Bytes upperRangeFixedSize(final Bytes key, final long to) { final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override - public Bytes lowerRange(final Bytes key, final long from) { + public Bytes lowerRangeFixedSize(final Bytes key, final long from) { final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override + public Bytes upperRange(Bytes key, long to) { + final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE) + .putLong(to) + // start can at most be equal to end + .putLong(to) + .array(); + return OrderedBytes.upperRange(key, maxSuffix); + } + + @Override + public Bytes lowerRange(Bytes key, long from) { + return OrderedBytes.lowerRange(key, MIN_SUFFIX); + } + + @Override public long segmentTimestamp(final Bytes key) { return SessionKeySerde.extractEnd(key.get()); } @Override - public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) { + public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { return new HasNextCondition() { @Override public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { while (iterator.hasNext()) { final Bytes bytes = iterator.peekNextKey(); final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes); - if (windowedKey.key().equals(binaryKey) - && windowedKey.window().end() >= from - && windowedKey.window().start() <= to) { + if (windowedKey.key().compareTo(binaryKeyFrom) >= 0 + && windowedKey.key().compareTo(binaryKeyTo) <= 0 + && windowedKey.window().end() >= from + && windowedKey.window().start() <= to) { return true; } iterator.next(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index b9a8665..214f36b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -21,9 +21,14 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; +import java.nio.ByteBuffer; import java.util.List; class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { + + private static final int SUFFIX_SIZE = WindowStoreUtils.TIMESTAMP_SIZE + WindowStoreUtils.SEQNUM_SIZE; + private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; + private StateSerdes<Bytes, byte[]> serdes; @Override @@ -33,21 +38,36 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { @Override public Bytes upperRange(final Bytes key, final long to) { - return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes); + final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE) + .putLong(to) + .putInt(Integer.MAX_VALUE) + .array(); + + return OrderedBytes.upperRange(key, maxSuffix); } @Override public Bytes lowerRange(final Bytes key, final long from) { + return OrderedBytes.lowerRange(key, MIN_SUFFIX); + } + + @Override + public Bytes lowerRangeFixedSize(final Bytes key, final long from) { return WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes); } @Override + public Bytes upperRangeFixedSize(final Bytes key, final long to) { + return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes); + } + + @Override public long segmentTimestamp(final Bytes key) { return WindowStoreUtils.timestampFromBinaryKey(key.get()); } @Override - public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) { + public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { return new HasNextCondition() { @Override public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { @@ -55,9 +75,10 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { final Bytes bytes = iterator.peekNextKey(); final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get()); final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get()); - if (keyBytes.equals(binaryKey) - && time >= from - && time <= to) { + if (keyBytes.compareTo(binaryKeyFrom) >= 0 + && keyBytes.compareTo(binaryKeyTo) <= 0 + && time >= from + && time <= to) { return true; } iterator.next(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java new file mode 100644 index 0000000..4fd6f3e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.util.NoSuchElementException; + +class WindowStoreIteratorWrapper<K, V> { + + // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs + private static class WrappedWindowStoreBytesIterator extends WindowStoreIteratorWrapper<Bytes, byte[]> { + WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying, + final StateSerdes<Bytes, byte[]> serdes, + final long windowSize) { + super(underlying, serdes, windowSize); + } + + @Override + public WindowStoreIterator<byte[]> valuesIterator() { + return new WrappedWindowStoreIterator<byte[]>(bytesIterator, serdes) { + @Override + public KeyValue<Long, byte[]> next() { + final KeyValue<Bytes, byte[]> next = bytesIterator.next(); + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get()); + return KeyValue.pair(timestamp, next.value); + } + }; + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator() { + return new WrappedKeyValueIterator<Bytes, byte[]>(bytesIterator, serdes, windowSize) { + @Override + public Windowed<Bytes> peekNextKey() { + final Bytes next = bytesIterator.peekNextKey(); + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get()); + final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get()); + return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); + } + + @Override + public KeyValue<Windowed<Bytes>, byte[]> next() { + if (!bytesIterator.hasNext()) { + throw new NoSuchElementException(); + } + + final KeyValue<Bytes, byte[]> next = bytesIterator.next(); + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get()); + final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.key.get()); + return KeyValue.pair( + new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), + next.value + ); + } + }; + } + } + + static WindowStoreIteratorWrapper<Bytes, byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying, + final StateSerdes<Bytes, byte[]> serdes, + final long windowSize) { + return new WrappedWindowStoreBytesIterator(underlying, serdes, windowSize); + } + + + protected final KeyValueIterator<Bytes, byte[]> bytesIterator; + protected final StateSerdes<K, V> serdes; + protected final long windowSize; + + WindowStoreIteratorWrapper( + final KeyValueIterator<Bytes, byte[]> bytesIterator, + final StateSerdes<K, V> serdes, + final long windowSize + ) { + this.bytesIterator = bytesIterator; + this.serdes = serdes; + this.windowSize = windowSize; + } + + public WindowStoreIterator<V> valuesIterator() { + return new WrappedWindowStoreIterator<>(bytesIterator, serdes); + } + + public KeyValueIterator<Windowed<K>, V> keyValueIterator() { + return new WrappedKeyValueIterator<>(bytesIterator, serdes, windowSize); + } + + private static class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> { + final KeyValueIterator<Bytes, byte[]> bytesIterator; + final StateSerdes<?, V> serdes; + + WrappedWindowStoreIterator( + KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<?, V> serdes) { + this.bytesIterator = bytesIterator; + this.serdes = serdes; + } + + @Override + public Long peekNextKey() { + return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get()); + } + + @Override + public boolean hasNext() { + return bytesIterator.hasNext(); + } + + @Override + public KeyValue<Long, V> next() { + final KeyValue<Bytes, byte[]> next = bytesIterator.next(); + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get()); + final V value = serdes.valueFrom(next.value); + return KeyValue.pair(timestamp, value); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName()); + } + + @Override + public void close() { + bytesIterator.close(); + } + } + + private static class WrappedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> { + final KeyValueIterator<Bytes, byte[]> bytesIterator; + final StateSerdes<K, V> serdes; + final long windowSize; + + WrappedKeyValueIterator( + KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<K, V> serdes, long windowSize) { + this.bytesIterator = bytesIterator; + this.serdes = serdes; + this.windowSize = windowSize; + } + + @Override + public Windowed<K> peekNextKey() { + final byte[] nextKey = bytesIterator.peekNextKey().get(); + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(nextKey); + final K key = WindowStoreUtils.keyFromBinaryKey(nextKey, serdes); + return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); + } + + @Override + public boolean hasNext() { + return bytesIterator.hasNext(); + } + + @Override + public KeyValue<Windowed<K>, V> next() { + final KeyValue<Bytes, byte[]> next = bytesIterator.next(); + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get()); + final K key = WindowStoreUtils.keyFromBinaryKey(next.key.get(), serdes); + final V value = serdes.valueFrom(next.value); + return KeyValue.pair( + new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), + value + ); + + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName()); + } + + @Override + public void close() { + bytesIterator.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java index faf2899..ed79947 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java @@ -19,14 +19,15 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.StateSerdes; import java.nio.ByteBuffer; public class WindowStoreUtils { - private static final int SEQNUM_SIZE = 4; - private static final int TIMESTAMP_SIZE = 8; + static final int SEQNUM_SIZE = 4; + static final int TIMESTAMP_SIZE = 8; /** Inner byte array serde used for segments */ static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes(); @@ -73,4 +74,13 @@ public class WindowStoreUtils { static int sequenceNumberFromBinaryKey(byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE); } + + /** + * Safely construct a time window of the given size, + * taking care of bounding endMs to Long.MAX_VALUE if necessary + */ + static TimeWindow timeWindowForSize(final long startMs, final long windowSize) { + final long endMs = startMs + windowSize; + return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java deleted file mode 100644 index 1ce6b04..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; -import org.apache.kafka.streams.state.WindowStoreIterator; - -import java.util.NoSuchElementException; - -class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> { - final KeyValueIterator<Bytes, byte[]> bytesIterator; - private final StateSerdes<?, V> serdes; - - // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs - private static class WrappedWindowStoreBytesIterator extends WrappedWindowStoreIterator<byte[]> { - WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying, - final StateSerdes<Bytes, byte[]> serdes) { - super(underlying, serdes); - } - - @Override - public KeyValue<Long, byte[]> next() { - if (!bytesIterator.hasNext()) { - throw new NoSuchElementException(); - } - - final KeyValue<Bytes, byte[]> next = bytesIterator.next(); - final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get()); - final byte[] value = next.value; - return KeyValue.pair(timestamp, value); - } - } - - static WrappedWindowStoreIterator<byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying, - final StateSerdes<Bytes, byte[]> serdes) { - return new WrappedWindowStoreBytesIterator(underlying, serdes); - } - - WrappedWindowStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<?, V> serdes) { - this.bytesIterator = bytesIterator; - this.serdes = serdes; - } - - @Override - public boolean hasNext() { - return bytesIterator.hasNext(); - } - - /** - * @throws NoSuchElementException if no next element exists - */ - @Override - public KeyValue<Long, V> next() { - final KeyValue<Bytes, byte[]> next = bytesIterator.next(); - final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get()); - final V value = serdes.valueFrom(next.value); - return KeyValue.pair(timestamp, value); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName()); - } - - @Override - public void close() { - bytesIterator.close(); - } - - @Override - public Long peekNextKey() { - return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 24e0329..97a1408 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -131,23 +131,23 @@ public class KStreamWindowAggregateTest { assertEquals(Utils.mkList( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1", - - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3", - - "[A@5]:0+1+1", "[A@10]:0+1", - "[B@5]:0+2+2+2", "[B@10]:0+2", - "[D@5]:0+4+4", "[D@10]:0+4", - "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", - "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); + "[A@0/10]:0+1", + "[B@0/10]:0+2", + "[C@0/10]:0+3", + "[D@0/10]:0+4", + "[A@0/10]:0+1+1", + + "[A@0/10]:0+1+1+1", "[A@5/15]:0+1", + "[B@0/10]:0+2+2", "[B@5/15]:0+2", + "[D@0/10]:0+4+4", "[D@5/15]:0+4", + "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2", + "[C@0/10]:0+3+3", "[C@5/15]:0+3", + + "[A@5/15]:0+1+1", "[A@10/20]:0+1", + "[B@5/15]:0+2+2+2", "[B@10/20]:0+2", + "[D@5/15]:0+4+4", "[D@10/20]:0+4", + "[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2", + "[C@5/15]:0+3+3", "[C@10/20]:0+3"), proc2.processed); } private void setRecordContext(final long time, final String topic) { @@ -210,11 +210,11 @@ public class KStreamWindowAggregateTest { driver.flushState(); proc1.checkAndClearProcessResult( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1" + "[A@0/10]:0+1", + "[B@0/10]:0+2", + "[C@0/10]:0+3", + "[D@0/10]:0+4", + "[A@0/10]:0+1+1" ); proc2.checkAndClearProcessResult(); proc3.checkAndClearProcessResult(); @@ -236,11 +236,11 @@ public class KStreamWindowAggregateTest { driver.flushState(); proc1.checkAndClearProcessResult( - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3" + "[A@0/10]:0+1+1+1", "[A@5/15]:0+1", + "[B@0/10]:0+2+2", "[B@5/15]:0+2", + "[D@0/10]:0+4+4", "[D@5/15]:0+4", + "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2", + "[C@0/10]:0+3+3", "[C@5/15]:0+3" ); proc2.checkAndClearProcessResult(); proc3.checkAndClearProcessResult(); @@ -263,18 +263,18 @@ public class KStreamWindowAggregateTest { proc1.checkAndClearProcessResult(); proc2.checkAndClearProcessResult( - "[A@0]:0+a", - "[B@0]:0+b", - "[C@0]:0+c", - "[D@0]:0+d", - "[A@0]:0+a+a" + "[A@0/10]:0+a", + "[B@0/10]:0+b", + "[C@0/10]:0+c", + "[D@0/10]:0+d", + "[A@0/10]:0+a+a" ); proc3.checkAndClearProcessResult( - "[A@0]:0+1+1+1%0+a", - "[B@0]:0+2+2+2%0+b", - "[C@0]:0+3+3%0+c", - "[D@0]:0+4+4%0+d", - "[A@0]:0+1+1+1%0+a+a"); + "[A@0/10]:0+1+1+1%0+a", + "[B@0/10]:0+2+2+2%0+b", + "[C@0/10]:0+3+3%0+c", + "[D@0/10]:0+4+4%0+d", + "[A@0/10]:0+1+1+1%0+a+a"); setRecordContext(5, topic1); driver.process(topic2, "A", "a"); @@ -293,18 +293,18 @@ public class KStreamWindowAggregateTest { driver.flushState(); proc1.checkAndClearProcessResult(); proc2.checkAndClearProcessResult( - "[A@0]:0+a+a+a", "[A@5]:0+a", - "[B@0]:0+b+b", "[B@5]:0+b", - "[D@0]:0+d+d", "[D@5]:0+d", - "[B@0]:0+b+b+b", "[B@5]:0+b+b", - "[C@0]:0+c+c", "[C@5]:0+c" + "[A@0/10]:0+a+a+a", "[A@5/15]:0+a", + "[B@0/10]:0+b+b", "[B@5/15]:0+b", + "[D@0/10]:0+d+d", "[D@5/15]:0+d", + "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b", + "[C@0/10]:0+c+c", "[C@5/15]:0+c" ); proc3.checkAndClearProcessResult( - "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", - "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", - "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", - "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", - "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" + "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a", + "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b", + "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d", + "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b", + "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c" ); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java index e7bd187..3ad6475 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -55,4 +56,9 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore { public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) { return null; } + + @Override + public WindowStoreIterator<KeyValue> fetch(Object from, Object to, long timeFrom, long timeTo) { + return null; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index f8eec1c..bfc20ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -63,11 +63,15 @@ public class CachingSessionStoreTest { public void setUp() throws Exception { final SessionKeySchema schema = new SessionKeySchema(); schema.init("topic"); - underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, schema); + final int retention = 60000; + final int numSegments = 3; + underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, schema); final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), - Serdes.Long()); + Serdes.Long(), + Segments.segmentInterval(retention, numSegments) + ); cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); @@ -86,6 +90,8 @@ public class CachingSessionStoreTest { cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L); cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L); + assertEquals(3, cache.size()); + final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a", 0, 0); final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b", 0, 0); @@ -93,7 +99,35 @@ public class CachingSessionStoreTest { assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), b.next()); assertFalse(a.hasNext()); assertFalse(b.hasNext()); + } + + @Test + public void shouldPutFetchAllKeysFromCache() throws Exception { + cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L); + + assertEquals(3, cache.size()); + + final KeyValueIterator<Windowed<String>, Long> all = cachingStore.findSessions("a", "b", 0, 0); + assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), all.next()); + assertEquals(KeyValue.pair(new Windowed<>("aa", new SessionWindow(0, 0)), 1L), all.next()); + assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), all.next()); + assertFalse(all.hasNext()); + } + + @Test + public void shouldPutFetchRangeFromCache() throws Exception { + cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L); + assertEquals(3, cache.size()); + + final KeyValueIterator<Windowed<String>, Long> some = cachingStore.findSessions("aa", "b", 0, 0); + assertEquals(KeyValue.pair(new Windowed<>("aa", new SessionWindow(0, 0)), 1L), some.next()); + assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), some.next()); + assertFalse(some.hasNext()); } @Test @@ -164,6 +198,29 @@ public class CachingSessionStoreTest { } @Test + public void shouldFetchRangeCorrectlyAcrossSegments() throws Exception { + final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0)); + final Windowed<String> aa1 = new Windowed<>("aa", new SessionWindow(0, 0)); + final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL)); + final Windowed<String> a3 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2)); + final Windowed<String> aa3 = new Windowed<>("aa", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2)); + cachingStore.put(a1, 1L); + cachingStore.put(aa1, 1L); + cachingStore.put(a2, 2L); + cachingStore.put(a3, 3L); + cachingStore.put(aa3, 3L); + cachingStore.flush(); + + final KeyValueIterator<Windowed<String>, Long> rangeResults = cachingStore.findSessions("a", "aa", 0, Segments.MIN_SEGMENT_INTERVAL * 2); + assertEquals(a1, rangeResults.next().key); + assertEquals(aa1, rangeResults.next().key); + assertEquals(a2, rangeResults.next().key); + assertEquals(a3, rangeResults.next().key); + assertEquals(aa3, rangeResults.next().key); + assertFalse(rangeResults.hasNext()); + } + + @Test public void shouldForwardChangedValuesDuringFlush() throws Exception { final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 0)); final List<KeyValue<Windowed<String>, Change<Long>>> flushed = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 5b3123e..faf6e83 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -64,13 +64,16 @@ public class CachingWindowStoreTest { @Before public void setUp() throws Exception { keySchema = new WindowKeySchema(); - underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema); - final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false); + final int retention = 30000; + final int numSegments = 3; + underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, keySchema); + final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE); cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(); cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), Serdes.String(), - WINDOW_SIZE); + WINDOW_SIZE, + Segments.segmentInterval(retention, numSegments)); cachingStore.setFlushListener(cacheListener); cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); topic = "topic"; @@ -100,6 +103,19 @@ public class CachingWindowStoreTest { } @Test + public void shouldPutFetchRangeFromCache() throws Exception { + cachingStore.put("a", "a"); + cachingStore.put("b", "b"); + + final KeyValueIterator<Windowed<String>, String> iterator = cachingStore.fetch("a", "b", 10, 10); + assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a"), iterator.next()); + assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "b"), iterator.next()); + assertFalse(iterator.hasNext()); + assertEquals(2, cache.size()); + } + + + @Test public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception { int added = addItemsToCache(); // all dirty entries should have been flushed @@ -171,6 +187,19 @@ public class CachingWindowStoreTest { } @Test + public void shouldIterateCacheAndStoreKeyRange() throws Exception { + final Bytes key = Bytes.wrap("1" .getBytes()); + underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes()); + cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE); + + final KeyValueIterator<Windowed<String>, String> fetchRange = + cachingStore.fetch("1", "2", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE); + assertEquals(KeyValue.pair(new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a"), fetchRange.next()); + assertEquals(KeyValue.pair(new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)), "b"), fetchRange.next()); + assertFalse(fetchRange.hasNext()); + } + + @Test public void shouldClearNamespaceCacheOnClose() throws Exception { cachingStore.put("a", "a"); assertEquals(1, cache.size()); @@ -185,12 +214,17 @@ public class CachingWindowStoreTest { } @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() throws Exception { + cachingStore.close(); + cachingStore.fetch("a", "b", 0, 10); + } + + @Test(expected = InvalidStateStoreException.class) public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception { cachingStore.close(); cachingStore.put("a", "a"); } - @SuppressWarnings("unchecked") @Test public void shouldFetchAndIterateOverExactKeys() throws Exception { cachingStore.put("a", "0001", 0); @@ -203,6 +237,34 @@ public class CachingWindowStoreTest { assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected)); } + @Test + public void shouldFetchAndIterateOverKeyRange() throws Exception { + cachingStore.put("a", "0001", 0); + cachingStore.put("aa", "0002", 0); + cachingStore.put("a", "0003", 1); + cachingStore.put("aa", "0004", 1); + cachingStore.put("a", "0005", 60000); + + assertThat( + toList(cachingStore.fetch("a", "a", 0, Long.MAX_VALUE)), + equalTo(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L))) + ); + + assertThat( + toList(cachingStore.fetch("aa", "aa", 0, Long.MAX_VALUE)), + equalTo(Utils.mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1))) + ); + + assertThat( + toList(cachingStore.fetch("a", "aa", 0, Long.MAX_VALUE)), + equalTo(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1), windowedPair("a", "0005", 60000L))) + ); + } + + private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) { + return KeyValue.pair(new Windowed<>(key, new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), value); + } + private int addItemsToCache() throws IOException { int cachedSize = 0; int i = 0; @@ -216,4 +278,4 @@ public class CachingWindowStoreTest { return i; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java index 6f4ff07..b6e95a7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java @@ -38,6 +38,7 @@ import static org.junit.Assert.assertEquals; public class CompositeReadOnlyWindowStoreTest { + private static final long WINDOW_SIZE = 30_000; private final String storeName = "window-store"; private StateStoreProviderStub stubProviderOne; private StateStoreProviderStub stubProviderTwo; @@ -54,10 +55,10 @@ public class CompositeReadOnlyWindowStoreTest { public void before() { stubProviderOne = new StateStoreProviderStub(false); stubProviderTwo = new StateStoreProviderStub(false); - underlyingWindowStore = new ReadOnlyWindowStoreStub<>(); + underlyingWindowStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); stubProviderOne.addStore(storeName, underlyingWindowStore); - otherUnderlyingStore = new ReadOnlyWindowStoreStub<>(); + otherUnderlyingStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); stubProviderOne.addStore("other-window-store", otherUnderlyingStore); @@ -89,7 +90,7 @@ public class CompositeReadOnlyWindowStoreTest { @Test public void shouldFindValueForKeyWhenMultiStores() throws Exception { final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new - ReadOnlyWindowStoreStub<>(); + ReadOnlyWindowStoreStub<>(WINDOW_SIZE); stubProviderTwo.addStore(storeName, secondUnderlying); underlyingWindowStore.put("key-one", "value-one", 0L); @@ -162,4 +163,4 @@ public class CompositeReadOnlyWindowStoreTest { windowStoreIterator.next(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java index acded8c..6cc77df 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java @@ -34,6 +34,18 @@ import static org.junit.Assert.assertTrue; public class FilteredCacheIteratorTest { + private static final CacheFunction IDENTITY_FUNCTION = new CacheFunction() { + @Override + public Bytes key(Bytes cacheKey) { + return cacheKey; + } + + @Override + public Bytes cacheKey(Bytes key) { + return key; + } + }; + @SuppressWarnings("unchecked") private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null); private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()), @@ -58,8 +70,8 @@ public class FilteredCacheIteratorTest { } }; allIterator = new FilteredCacheIterator( - new DelegatingPeekingKeyValueIterator<>("", - store.all()), allCondition); + new DelegatingPeekingKeyValueIterator<>("", + store.all()), allCondition, IDENTITY_FUNCTION); final HasNextCondition firstEntryCondition = new HasNextCondition() { @Override @@ -69,7 +81,7 @@ public class FilteredCacheIteratorTest { }; firstEntryIterator = new FilteredCacheIterator( new DelegatingPeekingKeyValueIterator<>("", - store.all()), firstEntryCondition); + store.all()), firstEntryCondition, IDENTITY_FUNCTION); } @@ -115,4 +127,4 @@ public class FilteredCacheIteratorTest { allIterator.remove(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java index d3d8f40..ee5e529 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java @@ -36,6 +36,13 @@ import static org.junit.Assert.assertTrue; public class MergedSortedCacheWrappedSessionStoreIteratorTest { + private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) { + @Override + public long segmentId(Bytes key) { + return 0; + } + }; + private final String storeKey = "a"; private final String cacheKey = "b"; @@ -43,10 +50,13 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton( KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow), storeKey.getBytes())).iterator(); private final SessionWindow cacheWindow = new SessionWindow(10, 20); - private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair( - SessionKeySerde.toBinary( - new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy"), new LRUCacheEntry(cacheKey.getBytes()))) - .iterator(); + private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton( + KeyValue.pair( + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey( + SessionKeySerde.toBinary(new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy") + ), + new LRUCacheEntry(cacheKey.getBytes()) + )).iterator(); @Test public void shouldHaveNextFromStore() throws Exception { @@ -106,7 +116,10 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs)); - return new MergedSortedCacheSessionStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String())); + return new MergedSortedCacheSessionStoreIterator<>( + cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String()), + SINGLE_SEGMENT_CACHE_FUNCTION + ); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java index 2048688..fed39b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java @@ -36,6 +36,13 @@ import static org.junit.Assert.assertEquals; public class MergedSortedCacheWrappedWindowStoreIteratorTest { + private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) { + @Override + public long segmentId(Bytes key) { + return 0; + } + }; + private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>(); private final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); private final String namespace = "one"; @@ -52,16 +59,20 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { final Bytes keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes); final byte[] valBytes = String.valueOf(t + 10).getBytes(); expectedKvPairs.add(KeyValue.pair(t + 10, valBytes)); - cache.put(namespace, keyBytes, new LRUCacheEntry(valBytes)); + cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes)); } Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range( + namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes) + ); - final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())); + final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>( + cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()) + ); int index = 0; while (iterator.hasNext()) { final KeyValue<Long, byte[]> next = iterator.next(); @@ -74,12 +85,16 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { @Test public void shouldPeekNextStoreKey() throws Exception { windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes())); - cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes())); + cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes)), new LRUCacheEntry("b".getBytes())); Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes); - final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range( + namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes) + ); + final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>( + cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()) + ); assertThat(iterator.peekNextKey(), equalTo(0L)); iterator.next(); assertThat(iterator.peekNextKey(), equalTo(10L)); @@ -88,15 +103,14 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { @Test public void shouldPeekNextCacheKey() throws Exception { windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes())); - cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new LRUCacheEntry("b".getBytes())); + cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes)), new LRUCacheEntry("b".getBytes())); Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)); final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())); assertThat(iterator.peekNextKey(), equalTo(0L)); iterator.next(); assertThat(iterator.peekNextKey(), equalTo(10L)); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java new file mode 100644 index 0000000..114a150 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.KeyValueIteratorStub; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest { + private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) { + @Override + public long segmentId(Bytes key) { + return 0; + } + }; + private static final int WINDOW_SIZE = 10; + + private final String storeKey = "a"; + private final String cacheKey = "b"; + + private final TimeWindow storeWindow = new TimeWindow(0, 1); + private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton( + KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow), storeKey.getBytes())).iterator(); + private final TimeWindow cacheWindow = new TimeWindow(10, 20); + private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton( + KeyValue.pair( + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey( + WindowStoreUtils.toBinaryKey( + cacheKey, cacheWindow.start(), 0, + new StateSerdes<>("dummy", Serdes.String(), Serdes.String()) + ) + ), + new LRUCacheEntry(cacheKey.getBytes()) + )).iterator(); + + @Test + public void shouldHaveNextFromStore() throws Exception { + final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator + = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator()); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldGetNextFromStore() throws Exception { + final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator + = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator()); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey))); + } + + @Test + public void shouldPeekNextKeyFromStore() throws Exception { + final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator + = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator()); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow))); + } + + @Test + public void shouldHaveNextFromCache() throws Exception { + final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator + = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), + cacheKvs); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldGetNextFromCache() throws Exception { + final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator + = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey))); + } + + @Test + public void shouldPeekNextKeyFromCache() throws Exception { + final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator + = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow))); + } + + @Test + public void shouldIterateBothStoreAndCache() throws Exception { + final MergedSortedCacheWindowStoreKeyValueIterator<String, String> iterator = createIterator(storeKvs, cacheKvs); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey))); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey))); + assertFalse(iterator.hasNext()); + } + + private MergedSortedCacheWindowStoreKeyValueIterator<String, String> createIterator( + final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs, + final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs + ) { + final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator + = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs)); + + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator + = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs)); + return new MergedSortedCacheWindowStoreKeyValueIterator<>( + cacheIterator, + storeIterator, + new StateSerdes<>("name", Serdes.String(), Serdes.String()), + WINDOW_SIZE, + SINGLE_SEGMENT_CACHE_FUNCTION + ); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index e37e0b4..6974240 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -18,8 +18,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -28,15 +31,23 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; /** * A very simple window store stub for testing purposes. */ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, StateStore { - private final Map<Long, Map<K, V>> data = new HashMap<>(); + private final long windowSize; + private final Map<Long, NavigableMap<K, V>> data = new HashMap<>(); private boolean open = true; + public ReadOnlyWindowStoreStub(long windowSize) { + this.windowSize = windowSize; + } + @Override public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { if (!open) { @@ -52,9 +63,54 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, return new TheWindowStoreIterator<>(results.iterator()); } + @Override + public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) { + if (!open) { + throw new InvalidStateStoreException("Store is not open"); + } + final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>(); + for (long now = timeFrom; now <= timeTo; now++) { + final NavigableMap<K, V> kvMap = data.get(now); + if (kvMap != null) { + for (Entry<K, V> entry : kvMap.subMap(from, true, to, true).entrySet()) { + results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue())); + } + } + } + final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator(); + + return new KeyValueIterator<Windowed<K>, V>() { + @Override + public void close() { + + } + + @Override + public Windowed<K> peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public KeyValue<Windowed<K>, V> next() { + return iterator.next(); + } + + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() not supported in " + getClass().getName()); + } + }; + } + public void put(final K key, final V value, final long timestamp) { if (!data.containsKey(timestamp)) { - data.put(timestamp, new HashMap<K, V>()); + data.put(timestamp, new TreeMap<K, V>()); } data.get(timestamp).put(key, value); } @@ -123,7 +179,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, @Override public void remove() { - + throw new UnsupportedOperationException("remove() not supported in " + getClass().getName()); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index f5998dc..c30b0e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -168,13 +168,30 @@ public class RocksDBSessionStoreTest { sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); - final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE); - final List<Long> results = new ArrayList<>(); + KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE); + List<Long> results = new ArrayList<>(); while (iterator.hasNext()) { results.add(iterator.next().value); } assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L))); + + + iterator = sessionStore.findSessions("aa", 0, Long.MAX_VALUE); + results = new ArrayList<>(); + while (iterator.hasNext()) { + results.add(iterator.next().value); + } + + assertThat(results, equalTo(Arrays.asList(2L, 4L))); + + + final KeyValueIterator<Windowed<String>, Long> rangeIterator = sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE); + final List<Long> rangeResults = new ArrayList<>(); + while (rangeIterator.hasNext()) { + rangeResults.add(rangeIterator.next().value); + } + assertThat(rangeResults, equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L))); } static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {
