KAFKA-5192: add WindowStore range scan (KIP-155) Implements range scan for keys in windowed and session stores
Modifies caching session and windowed stores to use segmented cache keys. Cache keys are internally prefixed with their segment id to ensure key ordering in the cache matches the ordering in the underlying store for keys spread across multiple segments. This should also result in fewer cache keys getting scanned for queries spanning only some segments. Author: Xavier Léauté <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #3027 from xvrl/windowstore-range-scan Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e2875235 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e2875235 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e2875235 Branch: refs/heads/trunk Commit: e28752357705568219315375c666f8e500db9c12 Parents: b661d3b Author: Xavier Léauté <[email protected]> Authored: Thu May 18 17:02:51 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu May 18 17:02:51 2017 -0700 ---------------------------------------------------------------------- checkstyle/checkstyle.xml | 7 +- .../org/apache/kafka/common/utils/Bytes.java | 6 +- .../apache/kafka/streams/kstream/Windowed.java | 2 +- .../streams/state/ReadOnlySessionStore.java | 12 + .../streams/state/ReadOnlyWindowStore.java | 14 + .../kafka/streams/state/SessionStore.java | 15 + .../AbstractMergedSortedCacheStoreIterator.java | 18 +- .../streams/state/internals/CacheFunction.java | 25 ++ .../state/internals/CachingSessionStore.java | 56 +++- .../state/internals/CachingWindowStore.java | 64 +++- .../ChangeLoggingSegmentedBytesStore.java | 6 +- .../CompositeReadOnlySessionStore.java | 46 ++- .../internals/CompositeReadOnlyWindowStore.java | 49 ++- .../state/internals/FilteredCacheIterator.java | 41 ++- .../state/internals/KeyValueIterators.java | 71 ++++ .../MergedSortedCacheKeyValueStoreIterator.java | 12 +- .../MergedSortedCacheSessionStoreIterator.java | 39 ++- .../MergedSortedCacheWindowStoreIterator.java | 22 +- ...dSortedCacheWindowStoreKeyValueIterator.java | 75 +++++ .../internals/MeteredSegmentedBytesStore.java | 7 +- .../streams/state/internals/OrderedBytes.java | 68 ++++ .../internals/RocksDBSegmentedBytesStore.java | 18 +- .../state/internals/RocksDBSessionStore.java | 15 +- .../internals/RocksDBSessionStoreSupplier.java | 5 +- .../state/internals/RocksDBWindowStore.java | 48 ++- .../internals/RocksDBWindowStoreSupplier.java | 8 +- .../state/internals/SegmentedBytesStore.java | 46 ++- .../state/internals/SegmentedCacheFunction.java | 76 +++++ .../kafka/streams/state/internals/Segments.java | 7 +- .../state/internals/SessionKeySchema.java | 33 +- .../state/internals/WindowKeySchema.java | 31 +- .../internals/WindowStoreIteratorWrapper.java | 195 +++++++++++ .../state/internals/WindowStoreUtils.java | 14 +- .../internals/WrappedWindowStoreIterator.java | 91 ----- .../internals/KStreamWindowAggregateTest.java | 94 +++--- .../kafka/streams/state/NoOpWindowStore.java | 6 + .../internals/CachingSessionStoreTest.java | 61 +++- .../state/internals/CachingWindowStoreTest.java | 72 +++- .../CompositeReadOnlyWindowStoreTest.java | 9 +- .../internals/FilteredCacheIteratorTest.java | 20 +- ...tedCacheWrappedSessionStoreIteratorTest.java | 25 +- ...rtedCacheWrappedWindowStoreIteratorTest.java | 32 +- ...eWrappedWindowStoreKeyValueIteratorTest.java | 132 ++++++++ .../internals/ReadOnlyWindowStoreStub.java | 62 +++- .../internals/RocksDBSessionStoreTest.java | 21 +- .../state/internals/RocksDBWindowStoreTest.java | 331 ++++++++++++------- .../internals/SegmentedCacheFunctionTest.java | 124 +++++++ .../state/internals/SessionKeySchemaTest.java | 92 +++++- .../state/internals/WindowKeySchemaTest.java | 131 ++++++++ .../kafka/test/ReadOnlySessionStoreStub.java | 45 ++- .../kafka/test/SegmentedBytesStoreStub.java | 5 + 51 files changed, 2044 insertions(+), 460 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index ed846cd..9f9e9ae 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -57,10 +57,13 @@ <module name="LocalFinalVariableName"/> <module name="MemberName"/> <module name="ClassTypeParameterName"> - <property name="format" value="^[A-Z0-9]*$"/> + <property name="format" value="^[A-Z][a-zA-Z0-9]*$$"/> </module> <module name="MethodTypeParameterName"> - <property name="format" value="^[A-Z0-9]*$"/> + <property name="format" value="^[A-Z][a-zA-Z0-9]*$$"/> + </module> + <module name="InterfaceTypeParameterName"> + <property name="format" value="^[A-Z][a-zA-Z0-9]*$$"/> </module> <module name="PackageName"/> <module name="ParameterName"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java index cc794c5..3044020 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java @@ -50,7 +50,7 @@ public class Bytes implements Comparable<Bytes> { /** * Get the data from the Bytes. - * @return The data is only valid between offset and offset+length. + * @return The underlying byte array */ public byte[] get() { return this.bytes; @@ -139,9 +139,9 @@ public class Bytes implements Comparable<Bytes> { /** * A byte array comparator based on lexicograpic ordering. */ - public final static Comparator<byte[]> BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); + public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); - private interface ByteArrayComparator extends Comparator<byte[]>, Serializable { + public interface ByteArrayComparator extends Comparator<byte[]>, Serializable { int compare(final byte[] buffer1, int offset1, int length1, final byte[] buffer2, int offset2, int length2); http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java index aa5157d..7234797 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java @@ -72,7 +72,7 @@ public class Windowed<K> { @Override public String toString() { - return "[" + key + "@" + window.start() + "]"; + return "[" + key + "@" + window.start() + "/" + window.end() + "]"; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 5bc8a42..7079769 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -41,4 +41,16 @@ public interface ReadOnlySessionStore<K, AGG> { * @return KeyValueIterator containing all sessions for the provided key. */ KeyValueIterator<Windowed<K>, AGG> fetch(final K key); + + /** + * Retrieve all aggregated sessions for the given range of keys + * + * For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest + * available session to the newest/latest session. + * + * @param from first key in the range to find aggregated session values for + * @param to last key in the range to find aggregated session values for + * @return KeyValueIterator containing all sessions for the provided key. + */ + KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java index 5546000..51864e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; /** * A window store that only supports read operations @@ -59,4 +60,17 @@ public interface ReadOnlyWindowStore<K, V> { * @throws InvalidStateStoreException if the store is not initialized */ WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); + + /** + * Get all the key-value pairs in the given key range and time range from all + * the existing windows. + * + * @param from the first key in the range + * @param to the last key in the range + * @param timeFrom time range start (inclusive) + * @param timeTo time range end (inclusive) + * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>} + * @throws InvalidStateStoreException if the store is not initialized + */ + KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index 8ae5c06..a4cf12e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -29,10 +29,25 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K /** * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions * start is ≤ latestSessionStartTime + * + * @param key the key to return sessions for + * @param earliestSessionEndTime + * @return iterator of sessions with the matching key and aggregated values */ KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime); /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime + * @return iterator of sessions with the matching keys and aggregated values + */ + KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime); + + /** * Remove the session aggregated with provided {@link Windowed} key from the store * @param sessionKey key of the session to remove */ http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java index c5c1a2c..6eb9a0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; import java.util.NoSuchElementException; @@ -29,27 +28,26 @@ import java.util.NoSuchElementException; * @param <K> * @param <V> */ -abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyValueIterator<K, V> { +abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements KeyValueIterator<K, V> { private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator; - private final KeyValueIterator<KS, byte[]> storeIterator; - protected final StateSerdes<K, V> serdes; + private final KeyValueIterator<KS, VS> storeIterator; AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, - final KeyValueIterator<KS, byte[]> storeIterator, - final StateSerdes<K, V> serdes) { + final KeyValueIterator<KS, VS> storeIterator) { this.cacheIterator = cacheIterator; this.storeIterator = storeIterator; - this.serdes = serdes; } abstract int compare(final Bytes cacheKey, final KS storeKey); abstract K deserializeStoreKey(final KS key); - abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, byte[]> pair); + abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, VS> pair); abstract K deserializeCacheKey(final Bytes cacheKey); + abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry); + private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) { return nextFromCache.value.value == null; } @@ -101,7 +99,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa } private KeyValue<K, V> nextStoreValue(KS nextStoreKey) { - final KeyValue<KS, byte[]> next = storeIterator.next(); + final KeyValue<KS, VS> next = storeIterator.next(); if (!next.key.equals(nextStoreKey)) { throw new IllegalStateException("Next record key is not the peeked key value; this should not happen"); @@ -117,7 +115,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa throw new IllegalStateException("Next record key is not the peeked key value; this should not happen"); } - return KeyValue.pair(deserializeCacheKey(next.key), serdes.valueFrom(next.value.value)); + return KeyValue.pair(deserializeCacheKey(next.key), deserializeCacheValue(next.value)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java new file mode 100644 index 0000000..66ef2d7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; + +interface CacheFunction { + Bytes key(Bytes cacheKey); + Bytes cacheKey(Bytes cacheKey); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 41e81eb..37d0c20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -40,6 +40,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i private final SessionKeySchema keySchema; private final Serde<K> keySerde; private final Serde<AGG> aggSerde; + private final SegmentedCacheFunction cacheFunction; private String cacheName; private ThreadCache cache; private StateSerdes<K, AGG> serdes; @@ -49,12 +50,14 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore, final Serde<K> keySerde, - final Serde<AGG> aggSerde) { + final Serde<AGG> aggSerde, + final long segmentInterval) { super(bytesStore); this.bytesStore = bytesStore; this.keySerde = keySerde; this.aggSerde = aggSerde; this.keySchema = new SessionKeySchema(); + this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); } public void init(final ProcessorContext context, final StateStore root) { @@ -91,15 +94,43 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final long latestSessionStartTime) { validateStoreOpen(); final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key)); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, - keySchema.lowerRange(binarySessionId, earliestSessionEndTime), - keySchema.upperRange(binarySessionId, latestSessionStartTime)); - final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(binarySessionId, earliestSessionEndTime, latestSessionStartTime); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(binarySessionId, earliestSessionEndTime)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(binarySessionId, latestSessionStartTime)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo); + + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions( + binarySessionId, earliestSessionEndTime, latestSessionStartTime + ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId, + binarySessionId, earliestSessionEndTime, latestSessionStartTime); - final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition); - return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes, cacheFunction); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(K keyFrom, + K keyTo, + long earliestSessionEndTime, + long latestSessionStartTime) { + validateStoreOpen(); + final Bytes binarySessionIdFrom = Bytes.wrap(serdes.rawKey(keyFrom)); + final Bytes binarySessionIdTo = Bytes.wrap(serdes.rawKey(keyTo)); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(binarySessionIdFrom, earliestSessionEndTime)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(binarySessionIdTo, latestSessionStartTime)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo); + + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions( + binarySessionIdFrom, binarySessionIdTo, earliestSessionEndTime, latestSessionStartTime + ); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionIdFrom, binarySessionIdTo, + earliestSessionEndTime, + latestSessionStartTime); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes, cacheFunction); } @Override @@ -114,7 +145,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final Bytes binaryKey = SessionKeySerde.toBinary(key, serdes.keySerializer(), topic); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), key.window().end(), context.partition(), context.topic()); - cache.put(cacheName, binaryKey, entry); + cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry); } @Override @@ -122,8 +153,15 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i return findSessions(key, 0, Long.MAX_VALUE); } + @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(K from, K to) { + return findSessions(from, to, 0, Long.MAX_VALUE); + } + + + private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { - final Bytes binaryKey = entry.key(); + final Bytes binaryKey = cacheFunction.key(entry.key()); final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index f492573..9a4a97c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -19,14 +19,14 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.internals.CacheFlushListener; -import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.CacheFlushListener; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -35,27 +35,32 @@ import java.util.List; class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> { + private final WindowStore<Bytes, byte[]> underlying; private final Serde<K> keySerde; private final Serde<V> valueSerde; private final long windowSize; private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema(); + private String name; private ThreadCache cache; private InternalProcessorContext context; private StateSerdes<K, V> serdes; private CacheFlushListener<Windowed<K>, V> flushListener; + private final SegmentedCacheFunction cacheFunction; CachingWindowStore(final WindowStore<Bytes, byte[]> underlying, final Serde<K> keySerde, final Serde<V> valueSerde, - final long windowSize) { + final long windowSize, + final long segmentInterval) { super(underlying); this.underlying = underlying; this.keySerde = keySerde; this.valueSerde = valueSerde; this.windowSize = windowSize; + this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); } @SuppressWarnings("unchecked") @@ -80,11 +85,11 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @Override public void apply(final List<ThreadCache.DirtyEntry> entries) { for (ThreadCache.DirtyEntry entry : entries) { - final byte[] binaryWindowKey = entry.key().get(); + final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get(); final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey); final Windowed<K> windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, serdes), - new TimeWindow(timestamp, timestamp + windowSize)); + WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey); maybeForward(entry, key, windowedKey, (InternalProcessorContext) context); underlying.put(key, entry.newValue(), timestamp); @@ -140,7 +145,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), timestamp, context.partition(), context.topic()); - cache.put(name, keyBytes, entry); + cache.put(name, cacheFunction.cacheKey(keyBytes), entry); } @Override @@ -149,23 +154,55 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl // if store is open outside as well. validateStoreOpen(); - Bytes fromBytes = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); - Bytes toBytes = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes); - final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key)); final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, fromBytes, toBytes); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(keyBytes, timeFrom)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(keyBytes, timeTo)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes, + keyBytes, timeFrom, timeTo); - final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator( + cacheIterator, hasNextCondition, cacheFunction + ); return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator, underlyingIterator, new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde())); } + @Override + public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { + // since this function may not access the underlying inner store, we need to validate + // if store is open outside as well. + validateStoreOpen(); + + final Bytes keyFromBytes = Bytes.wrap(serdes.rawKey(from)); + final Bytes keyToBytes = Bytes.wrap(serdes.rawKey(to)); + final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetch(keyFromBytes, keyToBytes, timeFrom, timeTo); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFromBytes, timeFrom)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyToBytes, timeTo)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); + + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFromBytes, + keyToBytes, + timeFrom, + timeTo); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + + return new MergedSortedCacheWindowStoreKeyValueIterator<>( + filteredCacheIterator, + underlyingIterator, + serdes, + windowSize, + cacheFunction + ); + } + private V fetchPrevious(final Bytes key, final long timestamp) { try (final WindowStoreIterator<byte[]> iter = underlying.fetch(key, timestamp, timestamp)) { if (!iter.hasNext()) { @@ -175,4 +212,5 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl } } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java index d23e115..9a826c4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java @@ -37,12 +37,16 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateSt } @Override - @SuppressWarnings("unchecked") public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) { return bytesStore.fetch(key, from, to); } @Override + public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) { + return bytesStore.fetch(keyFrom, keyTo, from, to); + } + + @Override public void remove(final Bytes key) { bytesStore.remove(key); changeLogger.logChange(key, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index 5f4fc64..d63ab4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; @@ -24,7 +23,6 @@ import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlySessionStore; import java.util.List; -import java.util.NoSuchElementException; /** * Wrapper over the underlying {@link ReadOnlySessionStore}s found in a {@link @@ -43,13 +41,15 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore this.storeName = storeName; } + private interface Fetcher<K, V> { + KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store); + } - @Override - public KeyValueIterator<Windowed<K>, V> fetch(final K key) { + private KeyValueIterator<Windowed<K>, V> fetch(Fetcher<K, V> fetcher) { final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); for (final ReadOnlySessionStore<K, V> store : stores) { try { - final KeyValueIterator<Windowed<K>, V> result = store.fetch(key); + final KeyValueIterator<Windowed<K>, V> result = fetcher.fetch(store); if (!result.hasNext()) { result.close(); } else { @@ -57,33 +57,31 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore } } catch (final InvalidStateStoreException ise) { throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + - " and may have been migrated to another instance; " + - "please re-discover its location from the state metadata."); + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata."); } } - return new KeyValueIterator<Windowed<K>, V>() { - @Override - public void close() { - } - - @Override - public Windowed<K> peekNextKey() { - throw new NoSuchElementException(); - } + return KeyValueIterators.emptyIterator(); + } - @Override - public boolean hasNext() { - return false; - } + @Override + public KeyValueIterator<Windowed<K>, V> fetch(final K key) { + return fetch(new Fetcher<K, V>() { @Override - public KeyValue<Windowed<K>, V> next() { - throw new NoSuchElementException(); + public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store) { + return store.fetch(key); } + }); + } + @Override + public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) { + return fetch(new Fetcher<K, V>() { @Override - public void remove() { + public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store) { + return store.fetch(from, to); } - }; + }); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index 22ec3c6..fbfb5aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -16,14 +16,14 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import java.util.List; -import java.util.NoSuchElementException; /** * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link @@ -43,45 +43,58 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K this.storeName = storeName; } - @Override - public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { + private interface Fetcher<K, V, IteratorType extends KeyValueIterator<?, V>> { + IteratorType fetch(ReadOnlyWindowStore<K, V> store); + IteratorType empty(); + } + + public <IteratorType extends KeyValueIterator<?, V>> IteratorType fetch(Fetcher<K, V, IteratorType> fetcher) { final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType); for (ReadOnlyWindowStore<K, V> windowStore : stores) { try { - final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo); + final IteratorType result = fetcher.fetch(windowStore); if (!result.hasNext()) { result.close(); } else { return result; } } catch (InvalidStateStoreException e) { - throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); + throw new InvalidStateStoreException( + "State store is not available anymore and may have been migrated to another instance; " + + "please re-discover its location from the state metadata."); } } - return new WindowStoreIterator<V>() { - @Override - public void close() { - } + return fetcher.empty(); + } + @Override + public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { + return fetch(new Fetcher<K, V, WindowStoreIterator<V>>() { @Override - public Long peekNextKey() { - throw new NoSuchElementException(); + public WindowStoreIterator<V> fetch(ReadOnlyWindowStore<K, V> store) { + return store.fetch(key, timeFrom, timeTo); } @Override - public boolean hasNext() { - return false; + public WindowStoreIterator<V> empty() { + return KeyValueIterators.emptyWindowStoreIterator(); } + }); + } + @Override + public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { + return fetch(new Fetcher<K, V, KeyValueIterator<Windowed<K>, V>>() { @Override - public KeyValue<Long, V> next() { - throw new NoSuchElementException(); + public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlyWindowStore<K, V> store) { + return store.fetch(from, to, timeFrom, timeTo); } @Override - public void remove() { + public KeyValueIterator<Windowed<K>, V> empty() { + return KeyValueIterators.emptyIterator(); } - }; + }); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java index 19370b9..4486fda 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java @@ -24,11 +24,48 @@ import java.util.NoSuchElementException; class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator; private final HasNextCondition hasNextCondition; + private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> wrappedIterator; FilteredCacheIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, - final HasNextCondition hasNextCondition) { + final HasNextCondition hasNextCondition, + final CacheFunction cacheFunction) { this.cacheIterator = cacheIterator; this.hasNextCondition = hasNextCondition; + this.wrappedIterator = new PeekingKeyValueIterator<Bytes, LRUCacheEntry>() { + @Override + public KeyValue<Bytes, LRUCacheEntry> peekNext() { + return cachedPair(cacheIterator.peekNext()); + } + + @Override + public void close() { + cacheIterator.close(); + } + + @Override + public Bytes peekNextKey() { + return cacheFunction.key(cacheIterator.peekNextKey()); + } + + @Override + public boolean hasNext() { + return cacheIterator.hasNext(); + } + + @Override + public KeyValue<Bytes, LRUCacheEntry> next() { + return cachedPair(cacheIterator.next()); + } + + private KeyValue<Bytes, LRUCacheEntry> cachedPair(KeyValue<Bytes, LRUCacheEntry> next) { + return KeyValue.pair(cacheFunction.key(next.key), next.value); + } + + @Override + public void remove() { + cacheIterator.remove(); + } + }; } @Override @@ -46,7 +83,7 @@ class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEn @Override public boolean hasNext() { - return hasNextCondition.hasNext(cacheIterator); + return hasNextCondition.hasNext(wrappedIterator); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java new file mode 100644 index 0000000..bef6f49 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.util.NoSuchElementException; + +class KeyValueIterators { + + private static class EmptyKeyValueIterator<K, V> implements KeyValueIterator<K, V> { + + @Override + public void close() { + } + + @Override + public K peekNextKey() { + throw new NoSuchElementException(); + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public KeyValue<K, V> next() { + throw new NoSuchElementException(); + } + + @Override + public void remove() { + } + } + + private static class EmptyWindowStoreIterator<V> extends EmptyKeyValueIterator<Long, V> + implements WindowStoreIterator<V> { + } + + private static final KeyValueIterator EMPTY_ITERATOR = new EmptyKeyValueIterator(); + private static final WindowStoreIterator EMPTY_WINDOW_STORE_ITERATOR = new EmptyWindowStoreIterator(); + + + @SuppressWarnings("unchecked") + static <K, V> KeyValueIterator<K, V> emptyIterator() { + return (KeyValueIterator<K, V>) EMPTY_ITERATOR; + } + + @SuppressWarnings("unchecked") + static <V> WindowStoreIterator<V> emptyWindowStoreIterator() { + return (WindowStoreIterator<V>) EMPTY_WINDOW_STORE_ITERATOR; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java index 07d5b6e..f7bb2ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java @@ -27,12 +27,15 @@ import org.apache.kafka.streams.state.StateSerdes; * @param <K> * @param <V> */ -class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedCacheStoreIterator<K, Bytes, V> { +class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedCacheStoreIterator<K, Bytes, V, byte[]> { + + private final StateSerdes<K, V> serdes; MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, final KeyValueIterator<Bytes, byte[]> storeIterator, final StateSerdes<K, V> serdes) { - super(cacheIterator, storeIterator, serdes); + super(cacheIterator, storeIterator); + this.serdes = serdes; } @Override @@ -46,6 +49,11 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedC } @Override + V deserializeCacheValue(final LRUCacheEntry cacheEntry) { + return serdes.valueFrom(cacheEntry.value); + } + + @Override public K deserializeStoreKey(final Bytes key) { return serdes.keyFrom(key.get()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index 3f9b620..67118f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -29,41 +29,46 @@ import org.apache.kafka.streams.state.StateSerdes; * @param <K> * @param <AGG> */ -class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, AGG> { - private final StateSerdes<K, AGG> rawSerdes; +class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, AGG, byte[]> { + private final StateSerdes<K, AGG> serdes; + private final SegmentedCacheFunction cacheFunction; MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator, - final StateSerdes<K, AGG> serdes) { - super(cacheIterator, storeIterator, new StateSerdes<>(serdes.topic(), - new SessionKeySerde<>(serdes.keySerde()), - serdes.valueSerde())); - - rawSerdes = serdes; + final StateSerdes<K, AGG> serdes, + final SegmentedCacheFunction cacheFunction) { + super(cacheIterator, storeIterator); + this.serdes = serdes; + this.cacheFunction = cacheFunction; } @Override - public KeyValue<Windowed<K>, AGG> deserializeStorePair(KeyValue<Windowed<Bytes>, byte[]> pair) { - final K key = rawSerdes.keyFrom(pair.key.key().get()); - return KeyValue.pair(new Windowed<>(key, pair.key.window()), serdes.valueFrom(pair.value)); + public KeyValue<Windowed<K>, AGG> deserializeStorePair(final KeyValue<Windowed<Bytes>, byte[]> pair) { + return KeyValue.pair(deserializeStoreKey(pair.key), serdes.valueFrom(pair.value)); } @Override Windowed<K> deserializeCacheKey(final Bytes cacheKey) { - return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer(), rawSerdes.topic()); + byte[] binaryKey = cacheFunction.key(cacheKey).get(); + return SessionKeySerde.from(binaryKey, serdes.keyDeserializer(), serdes.topic()); + } + + + @Override + AGG deserializeCacheValue(final LRUCacheEntry cacheEntry) { + return serdes.valueFrom(cacheEntry.value); } @Override - public Windowed<K> deserializeStoreKey(Windowed<Bytes> key) { - final K originalKey = rawSerdes.keyFrom(key.key().get()); + public Windowed<K> deserializeStoreKey(final Windowed<Bytes> key) { + final K originalKey = serdes.keyFrom(key.key().get()); return new Windowed<>(originalKey, key.window()); } @Override - public int compare(Bytes cacheKey, Windowed<Bytes> storeKey) { + public int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) { Bytes storeKeyBytes = SessionKeySerde.bytesToBinary(storeKey); - return cacheKey.compareTo(storeKeyBytes); + return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes); } } - http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java index 742bcbc..657b601 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java @@ -22,17 +22,22 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStoreIterator; +import static org.apache.kafka.streams.state.internals.SegmentedCacheFunction.bytesFromCacheKey; + /** * Merges two iterators. Assumes each of them is sorted by key * * @param <V> */ -class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheStoreIterator<Long, Long, V> implements WindowStoreIterator<V> { +class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheStoreIterator<Long, Long, V, byte[]> implements WindowStoreIterator<V> { + + private final StateSerdes<Long, V> serdes; MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, final KeyValueIterator<Long, byte[]> storeIterator, final StateSerdes<Long, V> serdes) { - super(cacheIterator, storeIterator, serdes); + super(cacheIterator, storeIterator); + this.serdes = serdes; } @Override @@ -42,7 +47,14 @@ class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheS @Override Long deserializeCacheKey(final Bytes cacheKey) { - return WindowStoreUtils.timestampFromBinaryKey(cacheKey.get()); + byte[] binaryKey = bytesFromCacheKey(cacheKey); + + return WindowStoreUtils.timestampFromBinaryKey(binaryKey); + } + + @Override + V deserializeCacheValue(final LRUCacheEntry cacheEntry) { + return serdes.valueFrom(cacheEntry.value); } @Override @@ -52,7 +64,9 @@ class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheS @Override public int compare(final Bytes cacheKey, final Long storeKey) { - final Long cacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(cacheKey.get()); + byte[] binaryKey = bytesFromCacheKey(cacheKey); + + final Long cacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey); return cacheTimestamp.compareTo(storeKey); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java new file mode 100644 index 0000000..92ad021 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; + +class MergedSortedCacheWindowStoreKeyValueIterator<K, V> + extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, V, byte[]> { + + private final StateSerdes<K, V> serdes; + private final long windowSize; + private final SegmentedCacheFunction cacheFunction; + + MergedSortedCacheWindowStoreKeyValueIterator( + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator, + final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator, + final StateSerdes<K, V> serdes, + final long windowSize, + final SegmentedCacheFunction cacheFunction + ) { + super(filteredCacheIterator, underlyingIterator); + this.serdes = serdes; + this.windowSize = windowSize; + this.cacheFunction = cacheFunction; + } + + @Override + Windowed<K> deserializeStoreKey(final Windowed<Bytes> key) { + return new Windowed<>(serdes.keyFrom(key.key().get()), key.window()); + } + + @Override + KeyValue<Windowed<K>, V> deserializeStorePair(final KeyValue<Windowed<Bytes>, byte[]> pair) { + return KeyValue.pair(deserializeStoreKey(pair.key), serdes.valueFrom(pair.value)); + } + + @Override + Windowed<K> deserializeCacheKey(final Bytes cacheKey) { + byte[] binaryKey = cacheFunction.key(cacheKey).get(); + + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey); + final K key = WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes); + return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); + } + + @Override + V deserializeCacheValue(final LRUCacheEntry cacheEntry) { + return serdes.valueFrom(cacheEntry.value); + } + + @Override + int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) { + Bytes storeKeyBytes = WindowStoreUtils.toBinaryKey(storeKey.key().get(), storeKey.window().start(), 0); + return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java index afc18e4..664873a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java @@ -79,11 +79,16 @@ class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractStateStore im } @Override - public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, long timeFrom, long timeTo) { + public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) { return new MeteredSegmentedBytesStoreIterator(inner.fetch(key, timeFrom, timeTo), this.fetchTime); } @Override + public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { + return new MeteredSegmentedBytesStoreIterator(inner.fetch(keyFrom, keyTo, from, to), this.fetchTime); + } + + @Override public void remove(final Bytes key) { final long startNs = time.nanoseconds(); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java new file mode 100644 index 0000000..ace2487 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; + +import java.nio.ByteBuffer; + +class OrderedBytes { + + private static final int MIN_KEY_LENGTH = 1; + /** + * Returns the upper byte range for a key with a given fixed size maximum suffix + * + * Assumes the minimum key length is one byte + */ + static Bytes upperRange(Bytes key, byte[] maxSuffix) { + final byte[] bytes = key.get(); + ByteBuffer rangeEnd = ByteBuffer.allocate(bytes.length + maxSuffix.length); + + int i = 0; + while (i < bytes.length && ( + i < MIN_KEY_LENGTH // assumes keys are at least one byte long + || (bytes[i] & 0xFF) >= (maxSuffix[0] & 0xFF) + )) { + rangeEnd.put(bytes[i++]); + } + + rangeEnd.put(maxSuffix); + rangeEnd.flip(); + + byte[] res = new byte[rangeEnd.remaining()]; + ByteBuffer.wrap(res).put(rangeEnd); + return Bytes.wrap(res); + } + + static Bytes lowerRange(Bytes key, byte[] minSuffix) { + final byte[] bytes = key.get(); + ByteBuffer rangeStart = ByteBuffer.allocate(bytes.length + minSuffix.length); + // any key in the range would start at least with the given prefix to be + // in the range, and have at least SUFFIX_SIZE number of trailing zero bytes. + + // unless there is a maximum key length, you can keep appending more zero bytes + // to keyFrom to create a key that will match the range, yet that would precede + // WindowStoreUtils.toBinaryKey(keyFrom, from, 0) in byte order + return Bytes.wrap( + rangeStart + .put(bytes) + .put(minSuffix) + .array() + ); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 252a55f..f3c4639 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -46,11 +46,23 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) { final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to); - final Bytes binaryFrom = keySchema.lowerRange(key, from); - final Bytes binaryTo = keySchema.upperRange(key, to); + final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from); + final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to); return new SegmentIterator(searchSpace.iterator(), - keySchema.hasNextCondition(key, from, to), + keySchema.hasNextCondition(key, key, from, to), + binaryFrom, binaryTo); + } + + @Override + public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, Bytes keyTo, final long from, final long to) { + final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to); + + final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); + final Bytes binaryTo = keySchema.upperRange(keyTo, to); + + return new SegmentIterator(searchSpace.iterator(), + keySchema.hasNextCondition(keyFrom, keyTo, from, to), binaryFrom, binaryTo); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 103bb55..9fde74b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -90,7 +90,15 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @Override public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime); + return findSessions(key, key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch( + Bytes.wrap(serdes.rawKey(keyFrom)), Bytes.wrap(serdes.rawKey(keyTo)), + earliestSessionEndTime, latestSessionStartTime + ); return new WrappedSessionStoreIterator<>(bytesIterator, serdes); } @@ -100,6 +108,11 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i } @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(K from, K to) { + return findSessions(from, to, 0, Long.MAX_VALUE); + } + + @Override public void remove(final Windowed<K> key) { bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java index 4e618d9..b407a22 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java @@ -50,6 +50,7 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, public SessionStore<K, V> get() { final SessionKeySchema keySchema = new SessionKeySchema(); + final long segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS); final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name, retentionPeriod, NUM_SEGMENTS, @@ -62,7 +63,7 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, final RocksDBSessionStore<Bytes, byte[]> sessionStore = RocksDBSessionStore.bytesStore(metered); - return new CachingSessionStore<>(sessionStore, keySerde, valueSerde); + return new CachingSessionStore<>(sessionStore, keySerde, valueSerde, segmentInterval); } if (cached) { @@ -71,7 +72,7 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, final RocksDBSessionStore<Bytes, byte[]> sessionStore = RocksDBSessionStore.bytesStore(metered); - return new CachingSessionStore<>(sessionStore, keySerde, valueSerde); + return new CachingSessionStore<>(sessionStore, keySerde, valueSerde, segmentInterval); } if (logged) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 5e8d0b2..b147894 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -29,19 +30,10 @@ import org.apache.kafka.streams.state.WindowStoreIterator; class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> { - private final Serde<K> keySerde; - private final Serde<V> valueSerde; - protected final SegmentedBytesStore bytesStore; - protected final boolean retainDuplicates; - - private ProcessorContext context; - protected StateSerdes<K, V> serdes; - protected int seqnum = 0; - // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs private static class RocksDBWindowBytesStore extends RocksDBWindowStore<Bytes, byte[]> { - RocksDBWindowBytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) { - super(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates); + RocksDBWindowBytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates, final long windowSize) { + super(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates, windowSize); } @Override @@ -54,23 +46,41 @@ class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @Override public WindowStoreIterator<byte[]> fetch(Bytes key, long timeFrom, long timeTo) { final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(key, timeFrom, timeTo); - return WrappedWindowStoreIterator.bytesIterator(bytesIterator, serdes); + return WindowStoreIteratorWrapper.bytesIterator(bytesIterator, serdes, windowSize).valuesIterator(); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to, long timeFrom, long timeTo) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(from, to, timeFrom, timeTo); + return WindowStoreIteratorWrapper.bytesIterator(bytesIterator, serdes, windowSize).keyValueIterator(); } } - static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) { - return new RocksDBWindowBytesStore(inner, retainDuplicates); + static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates, final long windowSize) { + return new RocksDBWindowBytesStore(inner, retainDuplicates, windowSize); } + private final Serde<K> keySerde; + private final Serde<V> valueSerde; + private final boolean retainDuplicates; + protected final long windowSize; + protected final SegmentedBytesStore bytesStore; + + private ProcessorContext context; + protected StateSerdes<K, V> serdes; + protected int seqnum = 0; + RocksDBWindowStore(final SegmentedBytesStore bytesStore, final Serde<K> keySerde, final Serde<V> valueSerde, - final boolean retainDuplicates) { + final boolean retainDuplicates, + final long windowSize) { super(bytesStore); this.keySerde = keySerde; this.valueSerde = valueSerde; this.bytesStore = bytesStore; this.retainDuplicates = retainDuplicates; + this.windowSize = windowSize; } @Override @@ -100,7 +110,13 @@ class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @Override public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo); - return new WrappedWindowStoreIterator<>(bytesIterator, serdes); + return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator(); + } + + @Override + public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo); + return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } void maybeUpdateSeqnumForDups() { http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index 1b270a2..b1e0b02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -37,6 +37,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V private final long retentionPeriod; private final boolean retainDuplicates; private final int numSegments; + private final long segmentInterval; private final long windowSize; private final boolean enableCaching; @@ -51,6 +52,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V this.numSegments = numSegments; this.windowSize = windowSize; this.enableCaching = enableCaching; + this.segmentInterval = Segments.segmentInterval(retentionPeriod, numSegments); } public String name() { @@ -84,9 +86,9 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V private WindowStore<K, V> maybeWrapCaching(final SegmentedBytesStore inner) { final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(inner, "rocksdb-window", time); if (!enableCaching) { - return new RocksDBWindowStore<>(metered, keySerde, valueSerde, retainDuplicates); + return new RocksDBWindowStore<>(metered, keySerde, valueSerde, retainDuplicates, windowSize); } - final RocksDBWindowStore<Bytes, byte[]> windowed = RocksDBWindowStore.bytesStore(metered, retainDuplicates); - return new CachingWindowStore<>(windowed, keySerde, valueSerde, windowSize); + final RocksDBWindowStore<Bytes, byte[]> windowed = RocksDBWindowStore.bytesStore(metered, retainDuplicates, windowSize); + return new CachingWindowStore<>(windowed, keySerde, valueSerde, windowSize, segmentInterval); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index 0c3bb53..72ae6e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -37,7 +37,18 @@ public interface SegmentedBytesStore extends StateStore { * @param to latest time to match * @return an iterator over key-value pairs */ - KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to); + KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to); + + /** + * Fetch all records from the segmented store in the provided key range and time range + * from all existing segments + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param from earliest time to match + * @param to latest time to match + * @return an iterator over key-value pairs + */ + KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to); /** * Remove the record with the provided key. The key @@ -75,7 +86,7 @@ public interface SegmentedBytesStore extends StateStore { void init(final String topic); /** - * Given a record-key and a time, construct a Segmented key that represents + * Given a range of record keys and a time, construct a Segmented key that represents * the upper range of keys to search when performing range queries. * @see SessionKeySchema#upperRange * @see WindowKeySchema#upperRange @@ -86,7 +97,7 @@ public interface SegmentedBytesStore extends StateStore { Bytes upperRange(final Bytes key, final long to); /** - * Given a record-key and a time, construct a Segmented key that represents + * Given a range of record keys and a time, construct a Segmented key that represents * the lower range of keys to search when performing range queries. * @see SessionKeySchema#lowerRange * @see WindowKeySchema#lowerRange @@ -97,6 +108,28 @@ public interface SegmentedBytesStore extends StateStore { Bytes lowerRange(final Bytes key, final long from); /** + * Given a range of fixed size record keys and a time, construct a Segmented key that represents + * the upper range of keys to search when performing range queries. + * @see SessionKeySchema#upperRange + * @see WindowKeySchema#upperRange + * @param key the last key in the range + * @param to the last timestamp in the range + * @return The key that represents the upper range to search for in the store + */ + Bytes upperRangeFixedSize(final Bytes key, final long to); + + /** + * Given a range of fixed size record keys and a time, construct a Segmented key that represents + * the lower range of keys to search when performing range queries. + * @see SessionKeySchema#lowerRange + * @see WindowKeySchema#lowerRange + * @param key the first key in the range + * @param from the first timestamp in the range + * @return The key that represents the lower range to search for in the store + */ + Bytes lowerRangeFixedSize(final Bytes key, final long from); + + /** * Extract the timestamp of the segment from the key. The key is a composite of * the record-key, any timestamps, plus any additional information. * @see SessionKeySchema#lowerRange @@ -108,13 +141,14 @@ public interface SegmentedBytesStore extends StateStore { /** * Create an implementation of {@link HasNextCondition} that knows when - * to stop iterating over the Segments. Used during {@link SegmentedBytesStore#fetch(Bytes, long, long)} operations - * @param binaryKey the record-key + * to stop iterating over the Segments. Used during {@link SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations + * @param binaryKeyFrom the first key in the range + * @param binaryKeyTo the last key in the range * @param from starting time range * @param to ending time range * @return */ - HasNextCondition hasNextCondition(final Bytes binaryKey, long from, long to); + HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to); /** * Used during {@link SegmentedBytesStore#fetch(Bytes, long, long)} operations to determine http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java new file mode 100644 index 0000000..8571f92 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema; + +import java.nio.ByteBuffer; + +class SegmentedCacheFunction implements CacheFunction { + + private static final int SEGMENT_ID_BYTES = 8; + + private final KeySchema keySchema; + private final long segmentInterval; + + SegmentedCacheFunction(KeySchema keySchema, long segmentInterval) { + this.keySchema = keySchema; + this.segmentInterval = segmentInterval; + } + + @Override + public Bytes key(Bytes cacheKey) { + return Bytes.wrap(bytesFromCacheKey(cacheKey)); + } + + @Override + public Bytes cacheKey(Bytes key) { + final byte[] keyBytes = key.get(); + ByteBuffer buf = ByteBuffer.allocate(SEGMENT_ID_BYTES + keyBytes.length); + buf.putLong(segmentId(key)).put(keyBytes); + return Bytes.wrap(buf.array()); + } + + static byte[] bytesFromCacheKey(Bytes cacheKey) { + byte[] binaryKey = new byte[cacheKey.get().length - SEGMENT_ID_BYTES]; + System.arraycopy(cacheKey.get(), SEGMENT_ID_BYTES, binaryKey, 0, binaryKey.length); + return binaryKey; + } + + public long segmentId(Bytes key) { + return keySchema.segmentTimestamp(key) / segmentInterval; + } + + int compareSegmentedKeys(Bytes cacheKey, Bytes storeKey) { + long storeSegmentId = segmentId(storeKey); + long cacheSegmentId = ByteBuffer.wrap(cacheKey.get()).getLong(); + + final int segmentCompare = Long.compare(cacheSegmentId, storeSegmentId); + if (segmentCompare == 0) { + byte[] cacheKeyBytes = cacheKey.get(); + byte[] storeKeyBytes = storeKey.get(); + return Bytes.BYTES_LEXICO_COMPARATOR.compare( + cacheKeyBytes, SEGMENT_ID_BYTES, cacheKeyBytes.length - SEGMENT_ID_BYTES, + storeKeyBytes, 0, storeKeyBytes.length + ); + } else { + return segmentCompare; + } + } +}
