This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a8f2307 KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes
Store and Memory LRU Caches (#6327)
a8f2307 is described below
commit a8f2307164ce0f1f47c458eee8f54173f7218a16
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed Feb 27 07:08:08 2019 -0800
KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and
Memory LRU Caches (#6327)
Second PR in series to inline the generic parameters of the following bytes
stores
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
<[email protected]>
---
.../org/apache/kafka/streams/state/Stores.java | 3 +-
.../internals/ChangeLoggingKeyValueBytesStore.java | 2 +-
.../streams/state/internals/MemoryLRUCache.java | 76 +++++++---------------
.../state/internals/MemoryNavigableLRUCache.java | 39 +++++------
.../state/internals/RocksDBSessionStore.java | 68 +++++++------------
.../state/internals/RocksDBWindowStore.java | 53 ++++++---------
.../RocksDbSessionBytesStoreSupplier.java | 3 +-
.../internals/RocksDbWindowBytesStoreSupplier.java | 11 ++--
.../internals/WindowStoreIteratorWrapper.java | 47 ++++---------
.../internals/WrappedSessionStoreIterator.java | 17 ++---
.../state/internals/CachingSessionStoreTest.java | 2 +-
.../state/internals/CachingWindowStoreTest.java | 4 +-
.../state/internals/RocksDBSessionStoreTest.java | 35 ++++------
.../state/internals/RocksDBWindowStoreTest.java | 18 -----
14 files changed, 126 insertions(+), 252 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 46a9d45..113e531 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.internals.ApiUtils;
@@ -134,7 +133,7 @@ public class Stores {
@Override
public KeyValueStore<Bytes, byte[]> get() {
- return new MemoryNavigableLRUCache<>(name, maxCacheSize,
Serdes.Bytes(), Serdes.ByteArray());
+ return new MemoryNavigableLRUCache(name, maxCacheSize);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 7567e78..aa931bf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -44,7 +44,7 @@ public class ChangeLoggingKeyValueBytesStore extends
WrappedStateStore<KeyValueS
// if the inner store is an LRU cache, add the eviction listener to
log removed record
if (wrapped() instanceof MemoryLRUCache) {
- ((MemoryLRUCache<Bytes, byte[]>)
wrapped()).setWhenEldestRemoved((key, value) -> {
+ ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
changeLogger.logChange(key, null);
});
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index f0c3c8c..d69df13 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -16,14 +16,12 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
import java.util.LinkedHashMap;
import java.util.List;
@@ -32,46 +30,31 @@ import java.util.Objects;
/**
* An in-memory LRU cache store based on HashSet and HashMap.
- *
- * * Note that the use of array-typed keys is discouraged because they result
in incorrect ordering behavior.
- * If you intend to work on byte arrays as key, for example, you may want to
wrap them with the {@code Bytes} class,
- * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code
RocksDBStore<byte[], ...>}.
- *
- * @param <K> The key type
- * @param <V> The value type
*/
-public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
- public interface EldestEntryRemovalListener<K, V> {
- void apply(K key, V value);
+ public interface EldestEntryRemovalListener {
+ void apply(Bytes key, byte[] value);
}
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
private final String name;
- protected final Map<K, V> map;
+ protected final Map<Bytes, byte[]> map;
- private StateSerdes<K, V> serdes;
private boolean restoring = false; // TODO: this is a sub-optimal solution
to avoid logging during restoration.
// in the future we should augment the
StateRestoreCallback with onComplete etc to better resolve this.
private volatile boolean open = true;
- private EldestEntryRemovalListener<K, V> listener;
+ private EldestEntryRemovalListener listener;
- MemoryLRUCache(final String name,
- final int maxCacheSize,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ MemoryLRUCache(final String name, final int maxCacheSize) {
this.name = name;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
// leave room for one extra entry to handle adding an entry before the
oldest can be removed
- this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ this.map = new LinkedHashMap<Bytes, byte[]>(maxCacheSize + 1, 1.01f,
true) {
private static final long serialVersionUID = 1L;
@Override
- protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
+ protected boolean removeEldestEntry(final Map.Entry<Bytes, byte[]>
eldest) {
final boolean evict = super.size() > maxCacheSize;
if (evict && !restoring && listener != null) {
listener.apply(eldest.getKey(), eldest.getValue());
@@ -81,7 +64,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K,
V> {
};
}
- void setWhenEldestRemoved(final EldestEntryRemovalListener<K, V> listener)
{
+ void setWhenEldestRemoved(final EldestEntryRemovalListener listener) {
this.listener = listener;
}
@@ -91,24 +74,12 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
}
@Override
- @SuppressWarnings("unchecked")
- public void init(final ProcessorContext context,
- final StateStore root) {
- // construct the serde
- this.serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ public void init(final ProcessorContext context, final StateStore root) {
// register the store
context.register(root, (key, value) -> {
restoring = true;
- // check value for null, to avoid deserialization error.
- if (value == null) {
- delete(serdes.keyFrom(key));
- } else {
- put(serdes.keyFrom(key), serdes.valueFrom(value));
- }
+ put(Bytes.wrap(key), value);
restoring = false;
});
}
@@ -124,28 +95,26 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public synchronized V get(final K key) {
+ public synchronized byte[] get(final Bytes key) {
Objects.requireNonNull(key);
return this.map.get(key);
}
@Override
- public synchronized void put(final K key,
- final V value) {
+ public synchronized void put(final Bytes key, final byte[] value) {
Objects.requireNonNull(key);
if (value == null) {
- this.map.remove(key);
+ delete(key);
} else {
this.map.put(key, value);
}
}
@Override
- public synchronized V putIfAbsent(final K key,
- final V value) {
+ public synchronized byte[] putIfAbsent(final Bytes key, final byte[]
value) {
Objects.requireNonNull(key);
- final V originalValue = get(key);
+ final byte[] originalValue = get(key);
if (originalValue == null) {
put(key, value);
}
@@ -153,14 +122,14 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public void putAll(final List<KeyValue<K, V>> entries) {
- for (final KeyValue<K, V> entry : entries) {
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
put(entry.key, entry.value);
}
}
@Override
- public synchronized V delete(final K key) {
+ public synchronized byte[] delete(final Bytes key) {
Objects.requireNonNull(key);
return this.map.remove(key);
}
@@ -169,8 +138,7 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
* @throws UnsupportedOperationException at every invocation
*/
@Override
- public KeyValueIterator<K, V> range(final K from,
- final K to) {
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes
to) {
throw new UnsupportedOperationException("MemoryLRUCache does not
support range() function.");
}
@@ -178,7 +146,7 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
* @throws UnsupportedOperationException at every invocation
*/
@Override
- public KeyValueIterator<K, V> all() {
+ public KeyValueIterator<Bytes, byte[]> all() {
throw new UnsupportedOperationException("MemoryLRUCache does not
support all() function.");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index d7b7b11..c3cc834 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -24,36 +24,37 @@ import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
-public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
+public class MemoryNavigableLRUCache extends MemoryLRUCache {
-
- public MemoryNavigableLRUCache(final String name, final int maxCacheSize,
final Serde<K> keySerde, final Serde<V> valueSerde) {
- super(name, maxCacheSize, keySerde, valueSerde);
+ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
+ super(name, maxCacheSize);
}
@Override
- public KeyValueIterator<K, V> range(final K from, final K to) {
- final TreeMap<K, V> treeMap = toTreeMap();
- return new DelegatingPeekingKeyValueIterator<>(name(), new
MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from,
true, to, true).iterator(), treeMap));
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes
to) {
+ final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
+ return new DelegatingPeekingKeyValueIterator<>(name(),
+ new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet()
+ .subSet(from, true, to, true).iterator(), treeMap));
}
@Override
- public KeyValueIterator<K, V> all() {
- final TreeMap<K, V> treeMap = toTreeMap();
- return new
MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().iterator(),
treeMap);
+ public KeyValueIterator<Bytes, byte[]> all() {
+ final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
+ return new
MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet().iterator(),
treeMap);
}
- private synchronized TreeMap<K, V> toTreeMap() {
+ private synchronized TreeMap<Bytes, byte[]> toTreeMap() {
return new TreeMap<>(this.map);
}
- private static class CacheIterator<K, V> implements KeyValueIterator<K, V>
{
- private final Iterator<K> keys;
- private final Map<K, V> entries;
- private K lastKey;
+ private static class CacheIterator implements KeyValueIterator<Bytes,
byte[]> {
+ private final Iterator<Bytes> keys;
+ private final Map<Bytes, byte[]> entries;
+ private Bytes lastKey;
- public CacheIterator(final Iterator<K> keys, final Map<K, V> entries) {
+ private CacheIterator(final Iterator<Bytes> keys, final Map<Bytes,
byte[]> entries) {
this.keys = keys;
this.entries = entries;
}
@@ -64,7 +65,7 @@ public class MemoryNavigableLRUCache<K, V> extends
MemoryLRUCache<K, V> {
}
@Override
- public KeyValue<K, V> next() {
+ public KeyValue<Bytes, byte[]> next() {
lastKey = keys.next();
return new KeyValue<>(lastKey, entries.get(lastKey));
}
@@ -80,7 +81,7 @@ public class MemoryNavigableLRUCache<K, V> extends
MemoryLRUCache<K, V> {
}
@Override
- public K peekNextKey() {
+ public Bytes peekNextKey() {
throw new UnsupportedOperationException("peekNextKey not
supported");
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index d855442..c9ca423 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -16,90 +16,66 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StateSerdes;
-public class RocksDBSessionStore<K, AGG> extends
WrappedStateStore<SegmentedBytesStore> implements SessionStore<K, AGG> {
+public class RocksDBSessionStore extends
WrappedStateStore<SegmentedBytesStore> implements SessionStore<Bytes, byte[]> {
- private final Serde<K> keySerde;
- private final Serde<AGG> aggSerde;
-
- private StateSerdes<K, AGG> serdes;
- private String topic;
-
- RocksDBSessionStore(final SegmentedBytesStore bytesStore,
- final Serde<K> keySerde,
- final Serde<AGG> aggSerde) {
+ RocksDBSessionStore(final SegmentedBytesStore bytesStore) {
super(bytesStore);
- this.keySerde = keySerde;
- this.aggSerde = aggSerde;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(final ProcessorContext context, final StateStore root) {
- final String storeName = name();
- topic =
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
-
- serdes = new StateSerdes<>(
- topic,
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
-
- super.init(context, root);
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final
long earliestSessionEndTime, final long latestSessionStartTime) {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes
key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(
- Bytes.wrap(serdes.rawKey(key)),
+ key,
earliestSessionEndTime,
latestSessionStartTime
);
- return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
+ return new WrappedSessionStoreIterator(bytesIterator);
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo, final long earliestSessionEndTime, final long
latestSessionStartTime) {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes
keyFrom,
+ final Bytes
keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(
- Bytes.wrap(serdes.rawKey(keyFrom)),
- Bytes.wrap(serdes.rawKey(keyTo)),
+ keyFrom,
+ keyTo,
earliestSessionEndTime,
latestSessionStartTime
);
- return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
+ return new WrappedSessionStoreIterator(bytesIterator);
}
@Override
- public AGG fetchSession(final K key, final long startTime, final long
endTime) {
- return
serdes.valueFrom(wrapped().get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)),
startTime, endTime)));
+ public byte[] fetchSession(final Bytes key, final long startTime, final
long endTime) {
+ return wrapped().get(SessionKeySchema.toBinary(key, startTime,
endTime));
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
return findSessions(key, 0, Long.MAX_VALUE);
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to) {
return findSessions(from, to, 0, Long.MAX_VALUE);
}
@Override
- public void remove(final Windowed<K> key) {
- wrapped().remove(Bytes.wrap(SessionKeySchema.toBinary(key,
serdes.keySerializer(), topic)));
+ public void remove(final Windowed<Bytes> key) {
+ wrapped().remove(SessionKeySchema.toBinary(key));
}
@Override
- public void put(final Windowed<K> sessionKey, final AGG aggregate) {
- wrapped().put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey,
serdes.keySerializer(), topic)), serdes.rawValue(aggregate));
+ public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+ wrapped().put(SessionKeySchema.toBinary(sessionKey), aggregate);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index c22ca52..44c9f79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -16,98 +16,85 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-public class RocksDBWindowStore<K, V> extends
WrappedStateStore<SegmentedBytesStore> implements WindowStore<K, V> {
+public class RocksDBWindowStore extends WrappedStateStore<SegmentedBytesStore>
implements WindowStore<Bytes, byte[]> {
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
private final boolean retainDuplicates;
private final long windowSize;
private ProcessorContext context;
- private StateSerdes<K, V> serdes;
private int seqnum = 0;
RocksDBWindowStore(final SegmentedBytesStore bytesStore,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
final boolean retainDuplicates,
final long windowSize) {
super(bytesStore);
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
this.retainDuplicates = retainDuplicates;
this.windowSize = windowSize;
}
@Override
- @SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
this.context = context;
- // construct the serde
- serdes = new
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>)
context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>)
context.valueSerde() : valueSerde);
-
super.init(context, root);
}
@Override
- public void put(final K key, final V value) {
+ public void put(final Bytes key, final byte[] value) {
put(key, value, context.timestamp());
}
@Override
- public void put(final K key, final V value, final long
windowStartTimestamp) {
+ public void put(final Bytes key, final byte[] value, final long
windowStartTimestamp) {
maybeUpdateSeqnumForDups();
- wrapped().put(WindowKeySchema.toStoreKeyBinary(key,
windowStartTimestamp, seqnum, serdes), serdes.rawValue(value));
+ wrapped().put(WindowKeySchema.toStoreKeyBinary(key,
windowStartTimestamp, seqnum), value);
}
@Override
- public V fetch(final K key, final long timestamp) {
- final byte[] bytesValue =
wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes));
+ public byte[] fetch(final Bytes key, final long timestamp) {
+ final byte[] bytesValue =
wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum));
if (bytesValue == null) {
return null;
}
- return serdes.valueFrom(bytesValue);
+ return bytesValue;
}
@SuppressWarnings("deprecation")
@Override
- public WindowStoreIterator<V> fetch(final K key, final long timeFrom,
final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator =
wrapped().fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
- return new WindowStoreIteratorWrapper<>(bytesIterator, serdes,
windowSize).valuesIterator();
+ public WindowStoreIterator<byte[]> fetch(final Bytes key, final long
timeFrom, final long timeTo) {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator =
wrapped().fetch(key, timeFrom, timeTo);
+ return new WindowStoreIteratorWrapper(bytesIterator,
windowSize).valuesIterator();
}
@SuppressWarnings("deprecation")
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final long timeFrom, final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator =
wrapped().fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)),
timeFrom, timeTo);
- return new WindowStoreIteratorWrapper<>(bytesIterator, serdes,
windowSize).keyValueIterator();
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+ final Bytes to,
+ final long timeFrom,
+ final long timeTo) {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator =
wrapped().fetch(from, to, timeFrom, timeTo);
+ return new WindowStoreIteratorWrapper(bytesIterator,
windowSize).keyValueIterator();
}
@Override
- public KeyValueIterator<Windowed<K>, V> all() {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().all();
- return new WindowStoreIteratorWrapper<>(bytesIterator, serdes,
windowSize).keyValueIterator();
+ return new WindowStoreIteratorWrapper(bytesIterator,
windowSize).keyValueIterator();
}
@SuppressWarnings("deprecation")
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long
timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator =
wrapped().fetchAll(timeFrom, timeTo);
- return new WindowStoreIteratorWrapper<>(bytesIterator, serdes,
windowSize).keyValueIterator();
+ return new WindowStoreIteratorWrapper(bytesIterator,
windowSize).keyValueIterator();
}
private void maybeUpdateSeqnumForDups() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index e88755b..8f305db 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
@@ -44,7 +43,7 @@ public class RocksDbSessionBytesStoreSupplier implements
SessionBytesStoreSuppli
retentionPeriod,
segmentIntervalMs(),
new SessionKeySchema());
- return new RocksDBSessionStore<>(segmented, Serdes.Bytes(),
Serdes.ByteArray());
+ return new RocksDBSessionStore(segmented);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index b9b7279..ecdfad2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
@@ -54,12 +53,10 @@ public class RocksDbWindowBytesStoreSupplier implements
WindowBytesStoreSupplier
segmentInterval,
new WindowKeySchema()
);
- return new RocksDBWindowStore<>(segmentedBytesStore,
- Serdes.Bytes(),
- Serdes.ByteArray(),
- retainDuplicates,
- windowSize);
-
+ return new RocksDBWindowStore(
+ segmentedBytesStore,
+ retainDuplicates,
+ windowSize);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
index 1feab8f..4095445 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
@@ -20,39 +20,33 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStoreIterator;
-class WindowStoreIteratorWrapper<K, V> {
+class WindowStoreIteratorWrapper {
private final KeyValueIterator<Bytes, byte[]> bytesIterator;
- private final StateSerdes<K, V> serdes;
private final long windowSize;
WindowStoreIteratorWrapper(final KeyValueIterator<Bytes, byte[]>
bytesIterator,
- final StateSerdes<K, V> serdes,
final long windowSize) {
this.bytesIterator = bytesIterator;
- this.serdes = serdes;
this.windowSize = windowSize;
}
- public WindowStoreIterator<V> valuesIterator() {
- return new WrappedWindowStoreIterator<>(bytesIterator, serdes);
+ public WindowStoreIterator<byte[]> valuesIterator() {
+ return new WrappedWindowStoreIterator(bytesIterator);
}
- public KeyValueIterator<Windowed<K>, V> keyValueIterator() {
- return new WrappedKeyValueIterator<>(bytesIterator, serdes,
windowSize);
+ public KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator() {
+ return new WrappedKeyValueIterator(bytesIterator, windowSize);
}
- private static class WrappedWindowStoreIterator<V> implements
WindowStoreIterator<V> {
+ private static class WrappedWindowStoreIterator implements
WindowStoreIterator<byte[]> {
final KeyValueIterator<Bytes, byte[]> bytesIterator;
- final StateSerdes<?, V> serdes;
WrappedWindowStoreIterator(
- final KeyValueIterator<Bytes, byte[]> bytesIterator, final
StateSerdes<?, V> serdes) {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator) {
this.bytesIterator = bytesIterator;
- this.serdes = serdes;
}
@Override
@@ -66,11 +60,10 @@ class WindowStoreIteratorWrapper<K, V> {
}
@Override
- public KeyValue<Long, V> next() {
+ public KeyValue<Long, byte[]> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
final long timestamp =
WindowKeySchema.extractStoreTimestamp(next.key.get());
- final V value = serdes.valueFrom(next.value);
- return KeyValue.pair(timestamp, value);
+ return KeyValue.pair(timestamp, next.value);
}
@Override
@@ -84,25 +77,20 @@ class WindowStoreIteratorWrapper<K, V> {
}
}
- private static class WrappedKeyValueIterator<K, V> implements
KeyValueIterator<Windowed<K>, V> {
+ private static class WrappedKeyValueIterator implements
KeyValueIterator<Windowed<Bytes>, byte[]> {
final KeyValueIterator<Bytes, byte[]> bytesIterator;
- final StateSerdes<K, V> serdes;
final long windowSize;
WrappedKeyValueIterator(final KeyValueIterator<Bytes, byte[]>
bytesIterator,
- final StateSerdes<K, V> serdes,
final long windowSize) {
this.bytesIterator = bytesIterator;
- this.serdes = serdes;
this.windowSize = windowSize;
}
@Override
- public Windowed<K> peekNextKey() {
+ public Windowed<Bytes> peekNextKey() {
final byte[] nextKey = bytesIterator.peekNextKey().get();
- final long timestamp =
WindowKeySchema.extractStoreTimestamp(nextKey);
- final K key = WindowKeySchema.extractStoreKey(nextKey, serdes);
- return new Windowed<>(key,
WindowKeySchema.timeWindowForSize(timestamp, windowSize));
+ return WindowKeySchema.fromStoreBytesKey(nextKey, windowSize);
}
@Override
@@ -111,16 +99,9 @@ class WindowStoreIteratorWrapper<K, V> {
}
@Override
- public KeyValue<Windowed<K>, V> next() {
+ public KeyValue<Windowed<Bytes>, byte[]> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
- final long timestamp =
WindowKeySchema.extractStoreTimestamp(next.key.get());
- final K key = WindowKeySchema.extractStoreKey(next.key.get(),
serdes);
- final V value = serdes.valueFrom(next.value);
- return KeyValue.pair(
- new Windowed<>(key,
WindowKeySchema.timeWindowForSize(timestamp, windowSize)),
- value
- );
-
+ return
KeyValue.pair(WindowKeySchema.fromStoreBytesKey(next.key.get(), windowSize),
next.value);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
index ce27457..281297c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
@@ -20,17 +20,13 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-class WrappedSessionStoreIterator<K, V> implements
KeyValueIterator<Windowed<K>, V> {
+class WrappedSessionStoreIterator implements KeyValueIterator<Windowed<Bytes>,
byte[]> {
private final KeyValueIterator<Bytes, byte[]> bytesIterator;
- private final StateSerdes<K, V> serdes;
- WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]>
bytesIterator,
- final StateSerdes<K, V> serdes) {
+ WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]>
bytesIterator) {
this.bytesIterator = bytesIterator;
- this.serdes = serdes;
}
@Override
@@ -39,9 +35,8 @@ class WrappedSessionStoreIterator<K, V> implements
KeyValueIterator<Windowed<K>,
}
@Override
- public Windowed<K> peekNextKey() {
- final Bytes bytes = bytesIterator.peekNextKey();
- return SessionKeySchema.from(bytes.get(), serdes.keyDeserializer(),
serdes.topic());
+ public Windowed<Bytes> peekNextKey() {
+ return SessionKeySchema.from(bytesIterator.peekNextKey());
}
@Override
@@ -50,9 +45,9 @@ class WrappedSessionStoreIterator<K, V> implements
KeyValueIterator<Windowed<K>,
}
@Override
- public KeyValue<Windowed<K>, V> next() {
+ public KeyValue<Windowed<Bytes>, byte[]> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
- return KeyValue.pair(SessionKeySchema.from(next.key.get(),
serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value));
+ return KeyValue.pair(SessionKeySchema.from(next.key), next.value);
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index a6f0a71..1cfdcd7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -71,7 +71,7 @@ public class CachingSessionStoreTest {
public void setUp() {
final SessionKeySchema schema = new SessionKeySchema();
final RocksDBSegmentedBytesStore underlying = new
RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL,
schema);
- final RocksDBSessionStore<Bytes, byte[]> sessionStore = new
RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
+ final RocksDBSessionStore sessionStore = new
RocksDBSessionStore(underlying);
cachingStore = new CachingSessionStore<>(sessionStore,
Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "),
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null,
cache);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index cb3fcd4..2bb758e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -83,10 +83,8 @@ public class CachingWindowStoreTest {
public void setUp() {
keySchema = new WindowKeySchema();
underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope",
0, SEGMENT_INTERVAL, keySchema);
- final RocksDBWindowStore<Bytes, byte[]> windowStore = new
RocksDBWindowStore<>(
+ final RocksDBWindowStore windowStore = new RocksDBWindowStore(
underlying,
- Serdes.Bytes(),
- Serdes.ByteArray(),
false,
WINDOW_SIZE);
cacheListener = new
CachingKeyValueStoreTest.CacheFlushListenerStub<>();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 3653e7e..0786c37 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
@@ -36,6 +37,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static java.time.Duration.ofMillis;
import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.apache.kafka.test.StreamsTestUtils.valuesToList;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -51,19 +53,12 @@ public class RocksDBSessionStoreTest {
@Before
public void before() {
- final SessionKeySchema schema = new SessionKeySchema();
-
- final RocksDBSegmentedBytesStore bytesStore = new
RocksDBSegmentedBytesStore(
- "session-store",
- "metrics-scope",
- 10_000L,
- 60_000L,
- schema);
-
- sessionStore = new RocksDBSessionStore<>(
- bytesStore,
+ sessionStore = Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(
+ "session-store",
+ ofMillis(10_000L)),
Serdes.String(),
- Serdes.Long());
+ Serdes.Long()).build();
context = new InternalMockProcessorContext(
TestUtils.tempDirectory(),
@@ -74,6 +69,7 @@ public class RocksDBSessionStoreTest {
new LogContext("testCache "),
0,
new MockStreamsMetrics(new Metrics())));
+
sessionStore.init(context, sessionStore);
}
@@ -188,17 +184,12 @@ public class RocksDBSessionStoreTest {
@Test
public void shouldFetchExactKeys() {
- final RocksDBSegmentedBytesStore bytesStore = new
RocksDBSegmentedBytesStore(
- "session-store",
- "metrics-scope",
- 0x7a00000000000000L,
- 0x7a00000000000000L,
- new SessionKeySchema());
-
- sessionStore = new RocksDBSessionStore<>(
- bytesStore,
+ sessionStore = Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(
+ "session-store",
+ ofMillis(0x7a00000000000000L)),
Serdes.String(),
- Serdes.Long());
+ Serdes.Long()).build();
sessionStore.init(context, sessionStore);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 32fa0f7..42b1b8c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -1369,24 +1369,6 @@ public class RocksDBWindowStoreTest {
}
@Test
- public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
- windowStore = new RocksDBWindowStore<>(
- new RocksDBSegmentedBytesStore(
- windowName,
- "metrics-scope",
- retentionPeriod,
- segmentInterval,
- new WindowKeySchema()),
- Serdes.Integer(),
- new SerdeThatDoesntHandleNull(),
- false,
- windowSize);
- windowStore.init(context, windowStore);
-
- assertNull(windowStore.fetch(1, 0));
- }
-
- @Test
public void shouldFetchAndIterateOverExactBinaryKeys() {
final WindowStore<Bytes, String> windowStore =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(windowName, ofMillis(60_000L),
ofMillis(60_000L), true),