This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3b08deaa76 KAFKA-13785: [8/N][emit final] time-ordered session store (#12127) 3b08deaa76 is described below commit 3b08deaa761c2387a41610893dc8302ab1d97338 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu May 5 16:09:16 2022 -0700 KAFKA-13785: [8/N][emit final] time-ordered session store (#12127) Time ordered session store implementation. I introduced AbstractRocksDBTimeOrderedSegmentedBytesStore to make it generic for RocksDBTimeOrderedSessionSegmentedBytesStore and RocksDBTimeOrderedSegmentedBytesStore. A few minor follow-up changes: 1. Avoid extra byte array allocation for fixed upper/lower range serialization. 2. Rename some class names to be more consistent. Authored-by: Hao Li <1127478+lihao...@users.noreply.github.com> Reviewers: Guozhang Wang <wangg...@gmail.com.com>, John Roesler <vvcep...@apache.org> --- ...stractDualSchemaRocksDBSegmentedBytesStore.java | 3 - ...ractRocksDBTimeOrderedSegmentedBytesStore.java} | 111 ++---- .../state/internals/PrefixedSessionKeySchemas.java | 387 +++++++++++++++++++ .../state/internals/PrefixedWindowKeySchemas.java | 6 +- ...cksDBTimeOrderedSessionSegmentedBytesStore.java | 136 +++++++ .../internals/RocksDBTimeOrderedSessionStore.java | 156 ++++++++ ...ocksDBTimeOrderedWindowSegmentedBytesStore.java | 127 +++++++ .../internals/RocksDBTimeOrderedWindowStore.java | 4 +- ...IndexedTimeOrderedWindowBytesStoreSupplier.java | 4 +- ...ocksDbTimeOrderedSessionBytesStoreSupplier.java | 69 ++++ .../streams/state/internals/SessionKeySchema.java | 40 +- .../internals/WrappedSessionStoreIterator.java | 12 +- ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 411 ++++++++++++++++++++- .../state/internals/RocksDBSessionStoreTest.java | 68 +++- .../RocksDBTimeOrderedSegmentedBytesStoreTest.java | 74 ---- ...DBTimeOrderedWindowSegmentedBytesStoreTest.java | 121 ++++++ .../state/internals/RocksDBWindowStoreTest.java | 68 ++-- ...xedTimeOrderedWindowBytesStoreSupplierTest.java | 8 +- .../state/internals/SessionKeySchemaTest.java | 223 ++++++++--- .../internals/TimeOrderedWindowStoreTest.java | 4 +- .../state/internals/WindowKeySchemaTest.java | 10 +- 21 files changed, 1757 insertions(+), 285 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index b1044eb49c..95c1d8d8c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -50,7 +50,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg private final String name; protected final AbstractSegments<S> segments; - private final String metricScope; protected final KeySchema baseKeySchema; protected final Optional<KeySchema> indexKeySchema; @@ -65,12 +64,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg private volatile boolean open; AbstractDualSchemaRocksDBSegmentedBytesStore(final String name, - final String metricScope, final KeySchema baseKeySchema, final Optional<KeySchema> indexKeySchema, final AbstractSegments<S> segments) { this.name = name; - this.metricScope = metricScope; this.baseKeySchema = baseKeySchema; this.indexKeySchema = indexKeySchema; this.segments = segments; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java similarity index 65% rename from streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java index e87af877fb..f7216412f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java @@ -16,22 +16,12 @@ */ package org.apache.kafka.streams.state.internals; -import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; -import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +30,15 @@ import org.slf4j.LoggerFactory; * lookup for a specific key. * * Schema for first SegmentedBytesStore (base store) is as below: - * Key schema: | timestamp + recordkey | + * Key schema: | timestamp + [timestamp] + recordkey | * Value schema: | value |. Value here is determined by caller. * * Schema for second SegmentedBytesStore (index store) is as below: - * Key schema: | record + timestamp | + * Key schema: | record + timestamp + [timestamp]| * Value schema: || * + * Note there could be two timestamps if we store both window end time and window start time. + * * Operations: * Put: 1. Put to index store. 2. Put to base store. * Delete: 1. Delete from base store. 2. Delete from index store. @@ -59,11 +51,13 @@ import org.slf4j.LoggerFactory; * Index store can be optional if we can construct the timestamp in base store instead of looking * them up from index store. * + * @see RocksDBTimeOrderedSessionSegmentedBytesStore + * @see RocksDBTimeOrderedWindowSegmentedBytesStore */ -public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> { +public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> { private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class); - private class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, byte[]> { + abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, byte[]> { private final KeyValueIterator<Bytes, byte[]> indexIterator; private byte[] cachedValue; @@ -95,7 +89,7 @@ public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractDualSchemaRoc if (cachedValue == null) { // Key not in base store, inconsistency happened and remove from index. indexIterator.next(); - RocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key); + AbstractRocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key); } else { return true; } @@ -114,84 +108,19 @@ public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractDualSchemaRoc return KeyValue.pair(getBaseKey(ret.key), value); } - private Bytes getBaseKey(final Bytes indexKey) { - final byte[] keyBytes = KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get()); - final long timestamp = KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get()); - final int seqnum = KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get()); - return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, timestamp, seqnum); - } + abstract protected Bytes getBaseKey(final Bytes indexKey); } - RocksDBTimeOrderedSegmentedBytesStore(final String name, - final String metricsScope, - final long retention, - final long segmentInterval, - final boolean withIndex) { - super(name, metricsScope, new TimeFirstWindowKeySchema(), - Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : null), + AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final KeySchema baseKeySchema, + final Optional<KeySchema> indexKeySchema) { + super(name, baseKeySchema, indexKeySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval)); } - public void put(final Bytes key, final long timestamp, final int seqnum, final byte[] value) { - final Bytes baseKey = TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum); - put(baseKey, value); - } - - byte[] fetch(final Bytes key, final long timestamp, final int seqnum) { - return get(TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum)); - } - - @Override - protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue) { - final byte[] key = TimeFirstWindowKeySchema.extractStoreKeyBytes(baseKey.get()); - final long timestamp = TimeFirstWindowKeySchema.extractStoreTimestamp(baseKey.get()); - final int seqnum = TimeFirstWindowKeySchema.extractStoreSequence(baseKey.get()); - - return KeyValue.pair(KeyFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum), new byte[0]); - } - - @Override - Map<KeyValueSegment, WriteBatch> getWriteBatches( - final Collection<ConsumerRecord<byte[], byte[]>> records) { - // advance stream time to the max timestamp in the batch - for (final ConsumerRecord<byte[], byte[]> record : records) { - final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key()); - observedStreamTime = Math.max(observedStreamTime, timestamp); - } - - final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>(); - for (final ConsumerRecord<byte[], byte[]> record : records) { - final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key()); - final long segmentId = segments.segmentId(timestamp); - final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); - if (segment != null) { - ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( - record, - consistencyEnabled, - position - ); - try { - final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch()); - - // Assuming changelog record is serialized using WindowKeySchema - // from ChangeLoggingTimestampedWindowBytesStore. Reconstruct key/value to restore - if (hasIndex()) { - final byte[] indexKey = KeyFirstWindowKeySchema.fromNonPrefixWindowKey(record.key()); - // Take care of tombstone - final byte[] value = record.value() == null ? null : new byte[0]; - segment.addToBatch(new KeyValue<>(indexKey, value), batch); - } - - final byte[] baseKey = TimeFirstWindowKeySchema.fromNonPrefixWindowKey(record.key()); - segment.addToBatch(new KeyValue<>(baseKey, record.value()), batch); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error restoring batch to store " + name(), e); - } - } - } - return writeBatchMap; - } - @Override public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, @@ -206,6 +135,8 @@ public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractDualSchemaRoc return fetch(key, from, to, false); } + abstract protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(final SegmentIterator<KeyValueSegment> segmentIterator); + KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to, @@ -217,7 +148,7 @@ public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractDualSchemaRoc final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, from); final Bytes binaryTo = indexKeySchema.get().upperRangeFixedSize(key, to); - return new IndexToBaseStoreIterator(new SegmentIterator<>( + return getIndexToBaseStoreIterator(new SegmentIterator<>( searchSpace.iterator(), indexKeySchema.get().hasNextCondition(key, key, from, to, forward), binaryFrom, @@ -275,7 +206,7 @@ public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractDualSchemaRoc final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, from); final Bytes binaryTo = indexKeySchema.get().upperRange(keyTo, to); - return new IndexToBaseStoreIterator(new SegmentIterator<>( + return getIndexToBaseStoreIterator(new SegmentIterator<>( searchSpace.iterator(), indexKeySchema.get().hasNextCondition(keyFrom, keyTo, from, to, forward), binaryFrom, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java new file mode 100644 index 0000000000..c98ae83390 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema; + +import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE; + +public class PrefixedSessionKeySchemas { + + private static final int PREFIX_SIZE = 1; + private static final byte TIME_FIRST_PREFIX = 0; + private static final byte KEY_FIRST_PREFIX = 1; + + private static byte extractPrefix(final byte[] binaryBytes) { + return binaryBytes[0]; + } + + public static class TimeFirstSessionKeySchema implements KeySchema { + + @Override + public Bytes upperRange(final Bytes key, final long to) { + if (key == null) { + // Put next prefix instead of null so that we can start from right prefix + // when scanning backwards + final byte nextPrefix = TIME_FIRST_PREFIX + 1; + return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE).put(nextPrefix).array()); + } + return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + 2 * TIMESTAMP_SIZE + key.get().length) + .put(TIME_FIRST_PREFIX) + // the end timestamp can be as large as possible as long as it's larger than start time + .putLong(Long.MAX_VALUE) + // this is the start timestamp + .putLong(to) + .put(key.get()) + .array()); + } + + @Override + public Bytes lowerRange(final Bytes key, final long from) { + if (key == null) { + return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE) + .put(TIME_FIRST_PREFIX) + .putLong(from) + .array()); + } + + return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + 2 * TIMESTAMP_SIZE + key.get().length) + .put(TIME_FIRST_PREFIX) + .putLong(from) + .putLong(0L) + .put(key.get()) + .array()); + } + + /** + * + * @param key the key in the range + * @param to the latest start time + * @return + */ + @Override + public Bytes upperRangeFixedSize(final Bytes key, final long to) { + return toBinary(key, to, Long.MAX_VALUE); + } + + /** + * + * @param key the key in the range + * @param from the earliest end timestamp in the range + * @return + */ + @Override + public Bytes lowerRangeFixedSize(final Bytes key, final long from) { + return toBinary(key, 0, Math.max(0, from)); + } + + @Override + public long segmentTimestamp(final Bytes key) { + return extractEndTimestamp(key.get()); + } + + @Override + public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, + final Bytes binaryKeyTo, final long from, final long to, final boolean forward) { + return iterator -> { + while (iterator.hasNext()) { + final Bytes bytes = iterator.peekNextKey(); + final byte prefix = extractPrefix(bytes.get()); + + if (prefix != TIME_FIRST_PREFIX) { + return false; + } + + final Windowed<Bytes> windowedKey = from(bytes); + final long endTime = windowedKey.window().end(); + final long startTime = windowedKey.window().start(); + + // We can return false directly here since keys are sorted by end time and if + // we get time smaller than `from`, there won't be time within range. + if (!forward && endTime < from) { + return false; + } + + if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0) + && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0) + && endTime >= from && startTime <= to) { + return true; + } + iterator.next(); + } + return false; + }; + } + + @Override + public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, + final long from, + final long to, + final boolean forward) { + return segments.segments(from, Long.MAX_VALUE, forward); + } + + static long extractStartTimestamp(final byte[] binaryKey) { + return ByteBuffer.wrap(binaryKey).getLong(PREFIX_SIZE + TIMESTAMP_SIZE); + } + + static long extractEndTimestamp(final byte[] binaryKey) { + return ByteBuffer.wrap(binaryKey).getLong(PREFIX_SIZE); + } + + private static <K> K extractKey(final byte[] binaryKey, + final Deserializer<K> deserializer, + final String topic) { + return deserializer.deserialize(topic, extractKeyBytes(binaryKey)); + } + + static byte[] extractKeyBytes(final byte[] binaryKey) { + final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE - PREFIX_SIZE]; + System.arraycopy(binaryKey, PREFIX_SIZE + 2 * TIMESTAMP_SIZE, bytes, 0, bytes.length); + return bytes; + } + + static Window extractWindow(final byte[] binaryKey) { + final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); + final long start = buffer.getLong(PREFIX_SIZE + TIMESTAMP_SIZE); + final long end = buffer.getLong(PREFIX_SIZE); + return new SessionWindow(start, end); + } + + public static Windowed<Bytes> from(final Bytes bytesKey) { + final byte[] binaryKey = bytesKey.get(); + final Window window = extractWindow(binaryKey); + return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), window); + } + + public static <K> Windowed<K> from(final byte[] binaryKey, + final Deserializer<K> keyDeserializer, + final String topic) { + final K key = extractKey(binaryKey, keyDeserializer, topic); + final Window window = extractWindow(binaryKey); + return new Windowed<>(key, window); + } + + public static <K> byte[] toBinary(final Windowed<K> sessionKey, + final Serializer<K> serializer, + final String topic) { + final byte[] bytes = serializer.serialize(topic, sessionKey.key()); + return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), sessionKey.window().end()).get(); + } + + public static Bytes toBinary(final Windowed<Bytes> sessionKey) { + return toBinary(sessionKey.key(), sessionKey.window().start(), sessionKey.window().end()); + } + + // for time prefixed schema, like the session key schema we need to put time stamps first, then the key + // and hence we need to override the write binary function with the write reordering + public static void writeBinary(final ByteBuffer buf, + final Bytes key, + final long startTime, + final long endTime) { + buf.putLong(endTime); + buf.putLong(startTime); + buf.put(key.get()); + } + + public static Bytes toBinary(final Bytes key, + final long startTime, + final long endTime) { + final ByteBuffer buf = ByteBuffer.allocate(PREFIX_SIZE + SessionKeySchema.keyByteLength(key)); + buf.put(TIME_FIRST_PREFIX); + writeBinary(buf, key, startTime, endTime); + return Bytes.wrap(buf.array()); + } + + public static byte[] extractWindowBytesFromNonPrefixSessionKey(final byte[] binaryKey) { + final ByteBuffer buffer = ByteBuffer.allocate(PREFIX_SIZE + binaryKey.length).put(TIME_FIRST_PREFIX); + // Put timestamp + buffer.put(binaryKey, binaryKey.length - 2 * TIMESTAMP_SIZE, 2 * TIMESTAMP_SIZE); + buffer.put(binaryKey, 0, binaryKey.length - 2 * TIMESTAMP_SIZE); + + return buffer.array(); + } + } + + public static class KeyFirstSessionKeySchema implements KeySchema { + + @Override + public Bytes upperRange(final Bytes key, final long to) { + final Bytes noPrefixBytes = new SessionKeySchema().upperRange(key, to); + return wrapPrefix(noPrefixBytes, true); + } + + @Override + public Bytes lowerRange(final Bytes key, final long from) { + final Bytes noPrefixBytes = new SessionKeySchema().lowerRange(key, from); + // Wrap at least prefix even key is null + return wrapPrefix(noPrefixBytes, false); + } + + @Override + public Bytes upperRangeFixedSize(final Bytes key, final long to) { + final ByteBuffer buffer = ByteBuffer.allocate(PREFIX_SIZE + SessionKeySchema.keyByteLength(key)); + buffer.put(KEY_FIRST_PREFIX); + SessionKeySchema.writeBinary(buffer, SessionKeySchema.upperRangeFixedWindow(key, to)); + return Bytes.wrap(buffer.array()); + } + + @Override + public Bytes lowerRangeFixedSize(final Bytes key, final long from) { + final ByteBuffer buffer = ByteBuffer.allocate(PREFIX_SIZE + SessionKeySchema.keyByteLength(key)); + buffer.put(KEY_FIRST_PREFIX); + SessionKeySchema.writeBinary(buffer, SessionKeySchema.lowerRangeFixedWindow(key, from)); + return Bytes.wrap(buffer.array()); + } + + @Override + public long segmentTimestamp(final Bytes key) { + return extractEndTimestamp(key.get()); + } + + @Override + public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, + final Bytes binaryKeyTo, + final long from, + final long to, + final boolean forward) { + return iterator -> { + while (iterator.hasNext()) { + final Bytes bytes = iterator.peekNextKey(); + final byte prefix = extractPrefix(bytes.get()); + + if (prefix != KEY_FIRST_PREFIX) { + return false; + } + + final Windowed<Bytes> windowedKey = from(bytes); + final long endTime = windowedKey.window().end(); + final long startTime = windowedKey.window().start(); + + if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0) + && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0) + && endTime >= from + && startTime <= to) { + return true; + } + iterator.next(); + } + return false; + }; + } + + @Override + public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, + final long from, + final long to, + final boolean forward) { + return segments.segments(from, Long.MAX_VALUE, forward); + } + + static Window extractWindow(final byte[] binaryKey) { + final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); + final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); + final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); + return new SessionWindow(start, end); + } + + static byte[] extractKeyBytes(final byte[] binaryKey) { + final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE - PREFIX_SIZE]; + System.arraycopy(binaryKey, PREFIX_SIZE, bytes, 0, bytes.length); + return bytes; + } + + public static Windowed<Bytes> from(final Bytes bytesKey) { + final byte[] binaryKey = bytesKey.get(); + final Window window = extractWindow(binaryKey); + return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), window); + } + + private static <K> K extractKey(final byte[] binaryKey, + final Deserializer<K> deserializer, + final String topic) { + return deserializer.deserialize(topic, extractKeyBytes(binaryKey)); + } + + public static <K> Windowed<K> from(final byte[] binaryKey, + final Deserializer<K> keyDeserializer, + final String topic) { + final K key = extractKey(binaryKey, keyDeserializer, topic); + final Window window = extractWindow(binaryKey); + return new Windowed<>(key, window); + } + + static long extractStartTimestamp(final byte[] binaryKey) { + return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE); + } + + static long extractEndTimestamp(final byte[] binaryKey) { + return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); + } + + public static Bytes toBinary(final Windowed<Bytes> sessionKey) { + return toBinary(sessionKey.key(), sessionKey.window().start(), sessionKey.window().end()); + } + + public static <K> byte[] toBinary(final Windowed<K> sessionKey, + final Serializer<K> serializer, + final String topic) { + final byte[] bytes = serializer.serialize(topic, sessionKey.key()); + return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), sessionKey.window().end()).get(); + } + + public static Bytes toBinary(final Bytes key, + final long startTime, + final long endTime) { + final ByteBuffer buf = ByteBuffer.allocate(PREFIX_SIZE + SessionKeySchema.keyByteLength(key)); + buf.put(KEY_FIRST_PREFIX); + SessionKeySchema.writeBinary(buf, key, startTime, endTime); + return Bytes.wrap(buf.array()); + } + + private static Bytes wrapPrefix(final Bytes noPrefixKey, final boolean upperRange) { + // Need to scan from prefix even key is null + if (noPrefixKey == null) { + final byte prefix = upperRange ? KEY_FIRST_PREFIX + 1 : KEY_FIRST_PREFIX; + final byte[] ret = ByteBuffer.allocate(PREFIX_SIZE) + .put(prefix) + .array(); + return Bytes.wrap(ret); + } + final byte[] ret = ByteBuffer.allocate(PREFIX_SIZE + noPrefixKey.get().length) + .put(KEY_FIRST_PREFIX) + .put(noPrefixKey.get()) + .array(); + return Bytes.wrap(ret); + } + + public static byte[] prefixNonPrefixSessionKey(final byte[] binaryKey) { + assert binaryKey != null; + + return wrapPrefix(Bytes.wrap(binaryKey), false).get(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java index 6304e4bd1d..47cf4b49b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java @@ -45,7 +45,7 @@ public class PrefixedWindowKeySchemas { return binaryBytes.length > 0 && binaryBytes[0] == TIME_FIRST_PREFIX; } - public static class TimeFirstWindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { + public static class TimeFirstWindowKeySchema implements KeySchema { @Override public Bytes upperRange(final Bytes key, final long to) { @@ -238,8 +238,6 @@ public class PrefixedWindowKeySchemas { public static class KeyFirstWindowKeySchema implements KeySchema { - - @Override public Bytes upperRange(final Bytes key, final long to) { final Bytes noPrefixBytes = new WindowKeySchema().upperRange(key, to); @@ -267,7 +265,7 @@ public class PrefixedWindowKeySchemas { @Override public long segmentTimestamp(final Bytes key) { - return KeyFirstWindowKeySchema.extractStoreTimestamp(key.get()); + return extractStoreTimestamp(key.get()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java new file mode 100644 index 0000000000..4265150eb9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; + +/** + * A RocksDB backed time-ordered segmented bytes store for session key schema. + */ +public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { + + private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator { + SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, byte[]> indexIterator) { + super(indexIterator); + } + + @Override + protected Bytes getBaseKey(final Bytes indexKey) { + final Window window = KeyFirstSessionKeySchema.extractWindow(indexKey.get()); + final byte[] key = KeyFirstSessionKeySchema.extractKeyBytes(indexKey.get()); + + return TimeFirstSessionKeySchema.toBinary(Bytes.wrap(key), window.start(), window.end()); + } + } + + RocksDBTimeOrderedSessionSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final boolean withIndex) { + super(name, metricsScope, retention, segmentInterval, new TimeFirstSessionKeySchema(), + Optional.ofNullable(withIndex ? new KeyFirstSessionKeySchema() : null)); + } + + public byte[] fetchSession(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return get(TimeFirstSessionKeySchema.toBinary( + key, + earliestSessionEndTime, + latestSessionStartTime + )); + } + + public void remove(final Windowed<Bytes> key) { + remove(TimeFirstSessionKeySchema.toBinary(key)); + } + + public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { + put(TimeFirstSessionKeySchema.toBinary(sessionKey), aggregate); + } + + @Override + protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue) { + final Window window = TimeFirstSessionKeySchema.extractWindow(baseKey.get()); + final byte[] key = TimeFirstSessionKeySchema.extractKeyBytes(baseKey.get()); + return KeyValue.pair(KeyFirstSessionKeySchema.toBinary(Bytes.wrap(key), window.start(), window.end()), new byte[0]); + } + + @Override + Map<KeyValueSegment, WriteBatch> getWriteBatches( + final Collection<ConsumerRecord<byte[], byte[]>> records) { + // advance stream time to the max timestamp in the batch + for (final ConsumerRecord<byte[], byte[]> record : records) { + final long timestamp = SessionKeySchema.extractEndTimestamp(record.key()); + observedStreamTime = Math.max(observedStreamTime, timestamp); + } + + final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>(); + for (final ConsumerRecord<byte[], byte[]> record : records) { + final long timestamp = SessionKeySchema.extractEndTimestamp(record.key()); + final long segmentId = segments.segmentId(timestamp); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); + if (segment != null) { + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + try { + final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch()); + + // Assuming changelog record is serialized using SessionKeySchema + // from ChangeLoggingSessionBytesStore. Reconstruct key/value to restore + if (hasIndex()) { + final byte[] indexKey = KeyFirstSessionKeySchema.prefixNonPrefixSessionKey(record.key()); + // Take care of tombstone + final byte[] value = record.value() == null ? null : new byte[0]; + segment.addToBatch(new KeyValue<>(indexKey, value), batch); + } + + final byte[] baseKey = TimeFirstSessionKeySchema.extractWindowBytesFromNonPrefixSessionKey(record.key()); + segment.addToBatch(new KeyValue<>(baseKey, record.value()), batch); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + name(), e); + } + } + } + return writeBatchMap; + } + + @Override + protected IndexToBaseStoreIterator getIndexToBaseStoreIterator( + final SegmentIterator<KeyValueSegment> segmentIterator) { + return new SessionKeySchemaIndexToBaseStoreIterator(segmentIterator); + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java new file mode 100644 index 0000000000..5b72163757 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema; + +public class RocksDBTimeOrderedSessionStore + extends WrappedStateStore<RocksDBTimeOrderedSessionSegmentedBytesStore, Object, Object> + implements SessionStore<Bytes, byte[]> { + + private StateStoreContext stateStoreContext; + + RocksDBTimeOrderedSessionStore(final RocksDBTimeOrderedSessionSegmentedBytesStore store) { + super(store); + Objects.requireNonNull(store, "store is null"); + } + + @Override + public void init(final StateStoreContext context, final StateStore root) { + wrapped().init(context, root); + this.stateStoreContext = context; + } + + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + config, + this, + getPosition(), + stateStoreContext + ); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch( + key, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator, TimeFirstSessionKeySchema::from); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch( + key, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator, TimeFirstSessionKeySchema::from); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch( + keyFrom, + keyTo, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator, TimeFirstSessionKeySchema::from); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch( + keyFrom, + keyTo, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator, TimeFirstSessionKeySchema::from); + } + + @Override + public byte[] fetchSession(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().fetchSession( + key, + earliestSessionEndTime, + latestSessionStartTime + ); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) { + return findSessions(key, 0, Long.MAX_VALUE); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + return backwardFindSessions(key, 0, Long.MAX_VALUE); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); + } + + @Override + public void remove(final Windowed<Bytes> sessionKey) { + wrapped().remove(sessionKey); + } + + @Override + public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { + wrapped().put(sessionKey, aggregate); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java new file mode 100644 index 0000000000..b44588da2b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; + +/** + * A RocksDB backed time-ordered segmented bytes store for window key schema. + */ +public class RocksDBTimeOrderedWindowSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { + + private class WindowKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator { + WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, byte[]> indexIterator) { + super(indexIterator); + } + + @Override + protected Bytes getBaseKey(final Bytes indexKey) { + final byte[] keyBytes = KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get()); + final long timestamp = KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get()); + final int seqnum = KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get()); + return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, timestamp, seqnum); + } + } + + RocksDBTimeOrderedWindowSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final boolean withIndex) { + super(name, metricsScope, retention, segmentInterval, new TimeFirstWindowKeySchema(), + Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : null)); + } + + public void put(final Bytes key, final long timestamp, final int seqnum, final byte[] value) { + final Bytes baseKey = TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum); + put(baseKey, value); + } + + byte[] fetch(final Bytes key, final long timestamp, final int seqnum) { + return get(TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum)); + } + + @Override + protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue) { + final byte[] key = TimeFirstWindowKeySchema.extractStoreKeyBytes(baseKey.get()); + final long timestamp = TimeFirstWindowKeySchema.extractStoreTimestamp(baseKey.get()); + final int seqnum = TimeFirstWindowKeySchema.extractStoreSequence(baseKey.get()); + + return KeyValue.pair(KeyFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum), new byte[0]); + } + + @Override + Map<KeyValueSegment, WriteBatch> getWriteBatches( + final Collection<ConsumerRecord<byte[], byte[]>> records) { + // advance stream time to the max timestamp in the batch + for (final ConsumerRecord<byte[], byte[]> record : records) { + final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key()); + observedStreamTime = Math.max(observedStreamTime, timestamp); + } + + final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>(); + for (final ConsumerRecord<byte[], byte[]> record : records) { + final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key()); + final long segmentId = segments.segmentId(timestamp); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); + if (segment != null) { + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + try { + final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch()); + + // Assuming changelog record is serialized using WindowKeySchema + // from ChangeLoggingTimestampedWindowBytesStore. Reconstruct key/value to restore + if (hasIndex()) { + final byte[] indexKey = KeyFirstWindowKeySchema.fromNonPrefixWindowKey(record.key()); + // Take care of tombstone + final byte[] value = record.value() == null ? null : new byte[0]; + segment.addToBatch(new KeyValue<>(indexKey, value), batch); + } + + final byte[] baseKey = TimeFirstWindowKeySchema.fromNonPrefixWindowKey(record.key()); + segment.addToBatch(new KeyValue<>(baseKey, record.value()), batch); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + name(), e); + } + } + } + return writeBatchMap; + } + + @Override + protected IndexToBaseStoreIterator getIndexToBaseStoreIterator( + final SegmentIterator<KeyValueSegment> segmentIterator) { + return new WindowKeySchemaIndexToBaseStoreIterator(segmentIterator); + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java index 4f2587d1e0..598b3d077e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java @@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFir public class RocksDBTimeOrderedWindowStore - extends WrappedStateStore<RocksDBTimeOrderedSegmentedBytesStore, Object, Object> + extends WrappedStateStore<RocksDBTimeOrderedWindowSegmentedBytesStore, Object, Object> implements WindowStore<Bytes, byte[]>, TimestampedBytesStore { private final boolean retainDuplicates; @@ -43,7 +43,7 @@ public class RocksDBTimeOrderedWindowStore private int seqnum = 0; RocksDBTimeOrderedWindowStore( - final RocksDBTimeOrderedSegmentedBytesStore store, + final RocksDBTimeOrderedWindowSegmentedBytesStore store, final boolean retainDuplicates, final long windowSize ) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java index af5417fccf..ac0b82f99f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java @@ -106,7 +106,7 @@ public class RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window switch (windowStoreType) { case DEFAULT_WINDOW_STORE: return new RocksDBTimeOrderedWindowStore( - new RocksDBTimeOrderedSegmentedBytesStore( + new RocksDBTimeOrderedWindowSegmentedBytesStore( name, metricsScope(), retentionPeriod, @@ -116,7 +116,7 @@ public class RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window windowSize); case INDEXED_WINDOW_STORE: return new RocksDBTimeOrderedWindowStore( - new RocksDBTimeOrderedSegmentedBytesStore( + new RocksDBTimeOrderedWindowSegmentedBytesStore( name, metricsScope(), retentionPeriod, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java new file mode 100644 index 0000000000..60cd710e6a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; + +public class RocksDbTimeOrderedSessionBytesStoreSupplier implements SessionBytesStoreSupplier { + private final String name; + private final long retentionPeriod; + private final boolean withIndex; + + public RocksDbTimeOrderedSessionBytesStoreSupplier(final String name, + final long retentionPeriod, + final boolean withIndex) { + this.name = name; + this.retentionPeriod = retentionPeriod; + this.withIndex = withIndex; + } + + @Override + public String name() { + return name; + } + + @Override + public SessionStore<Bytes, byte[]> get() { + return new RocksDBTimeOrderedSessionStore( + new RocksDBTimeOrderedSessionSegmentedBytesStore( + name, + metricsScope(), + retentionPeriod, + segmentIntervalMs(), + withIndex + ) + ); + } + + @Override + public String metricsScope() { + return "rocksdb-session"; + } + + @Override + public long segmentIntervalMs() { + // Selected somewhat arbitrarily. Profiling may reveal a different value is preferable. + return Math.max(retentionPeriod / 2, 60_000L); + } + + @Override + public long retentionPeriod() { + return retentionPeriod; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index d4196a9ede..505bbddc80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -34,18 +34,30 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { private static final int SUFFIX_SIZE = 2 * TIMESTAMP_SIZE; private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; + public static int keyByteLength(final Bytes key) { + return key.get().length + 2 * TIMESTAMP_SIZE; + } + @Override public Bytes upperRangeFixedSize(final Bytes key, final long to) { - final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); + final Windowed<Bytes> sessionKey = upperRangeFixedWindow(key, to); return SessionKeySchema.toBinary(sessionKey); } + public static <K> Windowed<K> upperRangeFixedWindow(final K key, final long to) { + return new Windowed<K>(key, new SessionWindow(to, Long.MAX_VALUE)); + } + @Override public Bytes lowerRangeFixedSize(final Bytes key, final long from) { - final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); + final Windowed<Bytes> sessionKey = lowerRangeFixedWindow(key, from); return SessionKeySchema.toBinary(sessionKey); } + public static <K> Windowed<K> lowerRangeFixedWindow(final K key, final long from) { + return new Windowed<K>(key, new SessionWindow(0, Math.max(0, from))); + } + @Override public Bytes upperRange(final Bytes key, final long to) { if (key == null) { @@ -161,11 +173,27 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { public static Bytes toBinary(final Bytes key, final long startTime, final long endTime) { - final byte[] bytes = key.get(); - final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); - buf.put(bytes); + final ByteBuffer buf = ByteBuffer.allocate(keyByteLength(key)); + writeBinary(buf, key, startTime, endTime); + return Bytes.wrap(buf.array()); + } + + public static void writeBinary(final ByteBuffer buf, final Windowed<Bytes> sessionKey) { + writeBinary(buf, sessionKey.key(), sessionKey.window().start(), sessionKey.window().end()); + } + + public static void writeBinary(final ByteBuffer buf, + final Bytes key, + final long startTime, + final long endTime) { + // we search for the session window that can overlap with the [ESET, LSST] range + // since the session window length can vary, we define the search boundary as: + // lower: [0, ESET] + // upper: [LSST, INF] + // and by puting the end time first and then the start time, the serialized search boundary + // is: [(ESET-0), (INF-LSST)] + buf.put(key.get()); buf.putLong(endTime); buf.putLong(startTime); - return Bytes.wrap(buf.array()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java index ce26029af4..3a39a95966 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.function.Function; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -24,9 +25,16 @@ import org.apache.kafka.streams.state.KeyValueIterator; class WrappedSessionStoreIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> { private final KeyValueIterator<Bytes, byte[]> bytesIterator; + private final Function<Bytes, Windowed<Bytes>> windowConstructor; WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator) { + this(bytesIterator, SessionKeySchema::from); + } + + WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, + final Function<Bytes, Windowed<Bytes>> windowConstructor) { this.bytesIterator = bytesIterator; + this.windowConstructor = windowConstructor; } @Override @@ -36,7 +44,7 @@ class WrappedSessionStoreIterator implements KeyValueIterator<Windowed<Bytes>, b @Override public Windowed<Bytes> peekNextKey() { - return SessionKeySchema.from(bytesIterator.peekNextKey()); + return windowConstructor.apply(bytesIterator.peekNextKey()); } @Override @@ -47,6 +55,6 @@ class WrappedSessionStoreIterator implements KeyValueIterator<Windowed<Bytes>, b @Override public KeyValue<Windowed<Bytes>, byte[]> next() { final KeyValue<Bytes, byte[]> next = bytesIterator.next(); - return KeyValue.pair(SessionKeySchema.from(next.key), next.value); + return KeyValue.pair(windowConstructor.apply(next.key), next.value); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index e8d578d017..0641392b2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; @@ -48,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema; import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema; @@ -98,7 +101,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends private AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore; private File stateDir; private final Window[] windows = new Window[4]; - private Window nextSegmentWindow; + private Window nextSegmentWindow, startEdgeWindow, endEdgeWindow; + private final long startEdgeTime = Long.MAX_VALUE - 700L; + private final long endEdgeTime = Long.MAX_VALUE - 600L; final long retention = 1000; final long segmentInterval = 60_000L; @@ -106,6 +111,20 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends @Before public void before() { + if (getBaseSchema() instanceof TimeFirstSessionKeySchema) { + windows[0] = new SessionWindow(10L, 10L); + windows[1] = new SessionWindow(500L, 1000L); + windows[2] = new SessionWindow(1_000L, 1_500L); + windows[3] = new SessionWindow(30_000L, 60_000L); + // All four of the previous windows will go into segment 1. + // The nextSegmentWindow is computed be a high enough time that when it gets written + // to the segment store, it will advance stream time past the first segment's retention time and + // expire it. + nextSegmentWindow = new SessionWindow(segmentInterval + retention, segmentInterval + retention); + + startEdgeWindow = new SessionWindow(0L, startEdgeTime); + endEdgeWindow = new SessionWindow(endEdgeTime, Long.MAX_VALUE); + } if (getBaseSchema() instanceof TimeFirstWindowKeySchema) { windows[0] = timeWindowForSize(10L, windowSizeForTimeWindow); windows[1] = timeWindowForSize(500L, windowSizeForTimeWindow); @@ -116,6 +135,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends // to the segment store, it will advance stream time past the first segment's retention time and // expire it. nextSegmentWindow = timeWindowForSize(segmentInterval + retention, windowSizeForTimeWindow); + + startEdgeWindow = timeWindowForSize(startEdgeTime, windowSizeForTimeWindow); + endEdgeWindow = timeWindowForSize(endEdgeTime, windowSizeForTimeWindow); } bytesStore = getBytesStore(); @@ -285,8 +307,370 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends } } + @Test + public void shouldPutAndFetchEdgeSingleKey() { + final String keyA = "a"; + final String keyB = "b"; + + final Bytes serializedKeyAStart = serializeKey(new Windowed<>(keyA, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyAEnd = serializeKey(new Windowed<>(keyA, endEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBStart = serializeKey(new Windowed<>(keyB, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBEnd = serializeKey(new Windowed<>(keyB, endEdgeWindow), false, + Integer.MAX_VALUE); + + bytesStore.put(serializedKeyAStart, serializeValue(10)); + bytesStore.put(serializedKeyAEnd, serializeValue(50)); + bytesStore.put(serializedKeyBStart, serializeValue(100)); + bytesStore.put(serializedKeyBEnd, serializeValue(150)); + + // Can fetch start/end edge for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L) + ); + + assertEquals(expected, toList(values)); + } + + // Can fetch start/end edge for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + Bytes.wrap(keyB.getBytes()), startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + + assertEquals(expected, toList(values)); + } + + // Can fetch from 0 to max for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), 0, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L) + ); + + assertEquals(expected, toList(values)); + } + + // Can fetch from 0 to max for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + Bytes.wrap(keyB.getBytes()), 0, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldPutAndFetchEdgeKeyRange() { + final String keyA = "a"; + final String keyB = "b"; + + final Bytes serializedKeyAStart = serializeKey(new Windowed<>(keyA, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyAEnd = serializeKey(new Windowed<>(keyA, endEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBStart = serializeKey(new Windowed<>(keyB, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBEnd = serializeKey(new Windowed<>(keyB, endEdgeWindow), false, + Integer.MAX_VALUE); + + bytesStore.put(serializedKeyAStart, serializeValue(10)); + bytesStore.put(serializedKeyAEnd, serializeValue(50)); + bytesStore.put(serializedKeyBStart, serializeValue(100)); + bytesStore.put(serializedKeyBEnd, serializeValue(150)); + // Can fetch from start/end for key range + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ) : asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + assertEquals(expected, toList(values)); + } + + // Can fetch from 0 to max for key range + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0L, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ) : asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + assertEquals(expected, toList(values)); + } + + // KeyB should be ignored and KeyA should be included even in storage + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + null, Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime - 1L)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + Bytes.wrap(keyB.getBytes()), null, startEdgeTime + 1, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + null, null, 0, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ) : asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( + null, null, startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ) : asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldPutAndBackwardFetchEdgeSingleKey() { + final String keyA = "a"; + final String keyB = "b"; + + final Bytes serializedKeyAStart = serializeKey(new Windowed<>(keyA, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyAEnd = serializeKey(new Windowed<>(keyA, endEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBStart = serializeKey(new Windowed<>(keyB, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBEnd = serializeKey(new Windowed<>(keyB, endEdgeWindow), false, + Integer.MAX_VALUE); + + bytesStore.put(serializedKeyAStart, serializeValue(10)); + bytesStore.put(serializedKeyAEnd, serializeValue(50)); + bytesStore.put(serializedKeyBStart, serializeValue(100)); + bytesStore.put(serializedKeyBEnd, serializeValue(150)); + + // Can fetch start/end edge for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + + assertEquals(expected, toList(values)); + } + + // Can fetch start/end edge for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + Bytes.wrap(keyB.getBytes()), startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L) + ); + + assertEquals(expected, toList(values)); + } + + // Can fetch from 0 to max for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), 0, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + + assertEquals(expected, toList(values)); + } + + // Can fetch from 0 to max for single key + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + Bytes.wrap(keyB.getBytes()), 0, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L) + ); + + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldPutAndBackwardFetchEdgeKeyRange() { + final String keyA = "a"; + final String keyB = "b"; + + final Bytes serializedKeyAStart = serializeKey(new Windowed<>(keyA, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyAEnd = serializeKey(new Windowed<>(keyA, endEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBStart = serializeKey(new Windowed<>(keyB, startEdgeWindow), false, + Integer.MAX_VALUE); + final Bytes serializedKeyBEnd = serializeKey(new Windowed<>(keyB, endEdgeWindow), false, + Integer.MAX_VALUE); + + bytesStore.put(serializedKeyAStart, serializeValue(10)); + bytesStore.put(serializedKeyAEnd, serializeValue(50)); + bytesStore.put(serializedKeyBStart, serializeValue(100)); + bytesStore.put(serializedKeyBEnd, serializeValue(150)); + + // Can fetch from start/end for key range + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ) : asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + assertEquals(expected, toList(values)); + } + + // Can fetch from 0 to max for key range + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0L, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ) : asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + assertEquals(expected, toList(values)); + } + + // KeyB should be ignored and KeyA should be included even in storage + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + null, Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime - 1L)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + Bytes.wrap(keyB.getBytes()), null, startEdgeTime + 1, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + null, null, 0, Long.MAX_VALUE)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ) : asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch( + null, null, startEdgeTime, endEdgeTime)) { + + final List<KeyValue<Windowed<String>, Long>> expected = getIndexSchema() == null ? asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ) : asList( + KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L), + KeyValue.pair(new Windowed<>(keyB, startEdgeWindow), 100L), + KeyValue.pair(new Windowed<>(keyA, endEdgeWindow), 50L), + KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) + ); + assertEquals(expected, toList(values)); + } + } + @Test public void shouldPutAndFetchWithPrefixKey() { + // Only for TimeFirstWindowKeySchema schema + if (!(getBaseSchema() instanceof TimeFirstWindowKeySchema)) { + return; + } final String keyA = "a"; final String keyB = "aa"; final String keyC = "aaa"; @@ -365,6 +749,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends @Test public void shouldPutAndBackwardFetchWithPrefix() { + // Only for TimeFirstWindowKeySchema schema + if (!(getBaseSchema() instanceof TimeFirstWindowKeySchema)) { + return; + } + final String keyA = "a"; final String keyB = "aa"; final String keyC = "aaa"; @@ -1081,10 +1470,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends private Bytes serializeKey(final Windowed<String> key, final boolean changeLog, final int seq) { final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class); - if (changeLog) { - return WindowKeySchema.toStoreKeyBinary(key, seq, stateSerdes); - } else if (getBaseSchema() instanceof TimeFirstWindowKeySchema) { + if (getBaseSchema() instanceof TimeFirstWindowKeySchema) { + if (changeLog) { + return WindowKeySchema.toStoreKeyBinary(key, seq, stateSerdes); + } return TimeFirstWindowKeySchema.toStoreKeyBinary(key, seq, stateSerdes); + } else if (getBaseSchema() instanceof TimeFirstSessionKeySchema) { + if (changeLog) { + return Bytes.wrap(SessionKeySchema.toBinary(key, stateSerdes.keySerializer(), "dummy")); + } + return Bytes.wrap(TimeFirstSessionKeySchema.toBinary(key, stateSerdes.keySerializer(), "dummy")); } else { throw new IllegalStateException("Unrecognized serde schema"); } @@ -1094,6 +1489,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class); if (getIndexSchema() instanceof KeyFirstWindowKeySchema) { return KeyFirstWindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes); + } else if (getIndexSchema() instanceof KeyFirstSessionKeySchema) { + return Bytes.wrap(KeyFirstSessionKeySchema.toBinary(key, stateSerdes.keySerializer(), "dummy")); } else { throw new IllegalStateException("Unrecognized serde schema"); } @@ -1119,6 +1516,12 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends stateSerdes.valueDeserializer().deserialize("dummy", next.value) ); results.add(deserialized); + } else if (getBaseSchema() instanceof TimeFirstSessionKeySchema) { + final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair( + TimeFirstSessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"), + stateSerdes.valueDeserializer().deserialize("dummy", next.value) + ); + results.add(deserialized); } else { throw new IllegalStateException("Unrecognized serde schema"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index deabea8596..b3a749a8a3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.Collection; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Windowed; @@ -27,29 +28,78 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.Stores; import org.junit.Test; -import java.util.Arrays; import java.util.HashSet; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.StreamsTestUtils.valuesToSet; import static org.junit.Assert.assertEquals; +@RunWith(Parameterized.class) public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest { private static final String STORE_NAME = "rocksDB session store"; + enum StoreType { + RocksDBSessionStore, + RocksDBTimeOrderedSessionStoreWithIndex, + RocksDBTimeOrderedSessionStoreWithoutIndex + } + @Parameter + public StoreType storeType; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> getKeySchema() { + return asList(new Object[][] { + {StoreType.RocksDBSessionStore}, + {StoreType.RocksDBTimeOrderedSessionStoreWithIndex}, + {StoreType.RocksDBTimeOrderedSessionStoreWithoutIndex} + }); + } + @Override <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod, final Serde<K> keySerde, final Serde<V> valueSerde) { - return Stores.sessionStoreBuilder( - Stores.persistentSessionStore( - STORE_NAME, - ofMillis(retentionPeriod)), - keySerde, - valueSerde).build(); + switch (storeType) { + case RocksDBSessionStore: { + return Stores.sessionStoreBuilder( + Stores.persistentSessionStore( + STORE_NAME, + ofMillis(retentionPeriod)), + keySerde, + valueSerde).build(); + } + case RocksDBTimeOrderedSessionStoreWithIndex: { + return Stores.sessionStoreBuilder( + new RocksDbTimeOrderedSessionBytesStoreSupplier( + STORE_NAME, + retentionPeriod, + true + ), + keySerde, + valueSerde + ).build(); + } + case RocksDBTimeOrderedSessionStoreWithoutIndex: { + return Stores.sessionStoreBuilder( + new RocksDbTimeOrderedSessionBytesStoreSupplier( + STORE_NAME, + retentionPeriod, + false + ), + keySerde, + valueSerde + ).build(); + } + default: + throw new IllegalStateException("Unknown StoreType: " + storeType); + } } @Test @@ -64,7 +114,7 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest { try (final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE) ) { - assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L))); + assertEquals(valuesToSet(iterator), new HashSet<>(asList(2L, 3L, 4L))); } } @@ -72,7 +122,7 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest { public void shouldMatchPositionAfterPut() { final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore; final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped(); - final RocksDBSessionStore rocksDBSessionStore = (RocksDBSessionStore) changeLoggingSessionBytesStore.wrapped(); + final WrappedStateStore rocksDBSessionStore = (WrappedStateStore) changeLoggingSessionBytesStore.wrapped(); context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders())); sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStoreTest.java deleted file mode 100644 index 0d5b016a9e..0000000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStoreTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import static java.util.Arrays.asList; - -import java.util.Collection; -import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; -import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; -import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; - -@RunWith(Parameterized.class) -public class RocksDBTimeOrderedSegmentedBytesStoreTest - extends AbstractDualSchemaRocksDBSegmentedBytesStoreTest<KeyValueSegment> { - - private final static String METRICS_SCOPE = "metrics-scope"; - - @Parameter - public String name; - - @Parameter(1) - public boolean hasIndex; - - @Parameterized.Parameters(name = "{0}") - public static Collection<Object[]> getKeySchema() { - return asList(new Object[][] { - {"WindowSchemaWithIndex", true}, - {"WindowSchemaWithoutIndex", false} - }); - } - - AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> getBytesStore() { - return new RocksDBTimeOrderedSegmentedBytesStore( - storeName, - METRICS_SCOPE, - retention, - segmentInterval, - hasIndex - ); - } - - @Override - KeyValueSegments newSegments() { - return new KeyValueSegments(storeName, METRICS_SCOPE, retention, segmentInterval); - } - - @Override - KeySchema getBaseSchema() { - return new TimeFirstWindowKeySchema(); - } - - @Override - KeySchema getIndexSchema() { - return hasIndex ? new KeyFirstWindowKeySchema() : null; - } - -} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStoreTest.java new file mode 100644 index 0000000000..db02f5b6ff --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStoreTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.util.Arrays.asList; + +import java.util.Collection; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class RocksDBTimeOrderedWindowSegmentedBytesStoreTest + extends AbstractDualSchemaRocksDBSegmentedBytesStoreTest<KeyValueSegment> { + + private final static String METRICS_SCOPE = "metrics-scope"; + + private enum SchemaType { + WindowSchemaWithIndex, + WindowSchemaWithoutIndex, + SessionSchemaWithIndex, + SessionSchemaWithoutIndex + } + + private boolean hasIndex; + private SchemaType schemaType; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> getKeySchema() { + return asList(new Object[][] { + {SchemaType.WindowSchemaWithIndex, true}, + {SchemaType.WindowSchemaWithoutIndex, false}, + {SchemaType.SessionSchemaWithIndex, true}, + {SchemaType.SessionSchemaWithoutIndex, false} + }); + } + + public RocksDBTimeOrderedWindowSegmentedBytesStoreTest(final SchemaType schemaType, final boolean hasIndex) { + this.schemaType = schemaType; + this.hasIndex = hasIndex; + } + + + AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> getBytesStore() { + switch (schemaType) { + case WindowSchemaWithIndex: + case WindowSchemaWithoutIndex: + return new RocksDBTimeOrderedWindowSegmentedBytesStore( + storeName, + METRICS_SCOPE, + retention, + segmentInterval, + hasIndex + ); + case SessionSchemaWithIndex: + case SessionSchemaWithoutIndex: + return new RocksDBTimeOrderedSessionSegmentedBytesStore( + storeName, + METRICS_SCOPE, + retention, + segmentInterval, + hasIndex + ); + default: + throw new IllegalStateException("Unknown SchemaType: " + schemaType); + } + } + + @Override + KeyValueSegments newSegments() { + return new KeyValueSegments(storeName, METRICS_SCOPE, retention, segmentInterval); + } + + @Override + KeySchema getBaseSchema() { + switch (schemaType) { + case WindowSchemaWithIndex: + case WindowSchemaWithoutIndex: + return new TimeFirstWindowKeySchema(); + case SessionSchemaWithIndex: + case SessionSchemaWithoutIndex: + return new TimeFirstSessionKeySchema(); + default: + throw new IllegalStateException("Unknown SchemaType: " + schemaType); + } + } + + @Override + KeySchema getIndexSchema() { + if (!hasIndex) { + return null; + } + switch (schemaType) { + case WindowSchemaWithIndex: + return new KeyFirstWindowKeySchema(); + case SessionSchemaWithIndex: + return new KeyFirstSessionKeySchema(); + default: + throw new IllegalStateException("Unknown SchemaType: " + schemaType); + } + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 5abfd0667d..c0c7e963e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -68,17 +68,14 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { } @Parameter - public String name; - - @Parameter(1) public StoreType storeType; @Parameterized.Parameters(name = "{0}") public static Collection<Object[]> getKeySchema() { return asList(new Object[][] { - {"RocksDBWindowStore", StoreType.RocksDBWindowStore}, - {"RocksDBTimeOrderedWindowStoreWithIndex", StoreType.RocksDBTimeOrderedWindowStoreWithIndex}, - {"RocksDBTimeOrderedWindowStoreWithoutIndex", StoreType.RocksDBTimeOrderedWindowStoreWithoutIndex} + {StoreType.RocksDBWindowStore}, + {StoreType.RocksDBTimeOrderedWindowStoreWithIndex}, + {StoreType.RocksDBTimeOrderedWindowStoreWithoutIndex} }); } @@ -88,32 +85,41 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { final boolean retainDuplicates, final Serde<K> keySerde, final Serde<V> valueSerde) { - if (storeType == StoreType.RocksDBWindowStore) { - return Stores.windowStoreBuilder( - Stores.persistentWindowStore( - STORE_NAME, - ofMillis(retentionPeriod), - ofMillis(windowSize), - retainDuplicates), + + switch (storeType) { + case RocksDBWindowStore: { + return Stores.windowStoreBuilder( + Stores.persistentWindowStore( + STORE_NAME, + ofMillis(retentionPeriod), + ofMillis(windowSize), + retainDuplicates), + keySerde, + valueSerde) + .build(); + } + case RocksDBTimeOrderedWindowStoreWithIndex: { + final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); + return Stores.windowStoreBuilder( + new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME, + retentionPeriod, defaultSegmentInterval, windowSize, retainDuplicates, + true), keySerde, - valueSerde) - .build(); - } else if (storeType == StoreType.RocksDBTimeOrderedWindowStoreWithIndex) { - final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); - return Stores.windowStoreBuilder( - new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME, - retentionPeriod, defaultSegmentInterval, windowSize, retainDuplicates, true), - keySerde, - valueSerde - ).build(); - } else { - final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); - return Stores.windowStoreBuilder( - new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME, - retentionPeriod, defaultSegmentInterval, windowSize, retainDuplicates, false), - keySerde, - valueSerde - ).build(); + valueSerde + ).build(); + } + case RocksDBTimeOrderedWindowStoreWithoutIndex: { + final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); + return Stores.windowStoreBuilder( + new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME, + retentionPeriod, defaultSegmentInterval, windowSize, retainDuplicates, + false), + keySerde, + valueSerde + ).build(); + } + default: + throw new IllegalStateException("Unknown StoreType: " + storeType); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java index ed1bbb8fd4..fad4cc5a47 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java @@ -60,8 +60,8 @@ public class RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest { final WindowStore store = RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), ofMillis(1L), false, true).get(); final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class)); - assertThat(wrapped, instanceOf(RocksDBTimeOrderedSegmentedBytesStore.class)); - assertTrue(((RocksDBTimeOrderedSegmentedBytesStore) wrapped).hasIndex()); + assertThat(wrapped, instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class)); + assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore) wrapped).hasIndex()); } @Test @@ -69,7 +69,7 @@ public class RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest { final WindowStore store = RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), ofMillis(1L), false, false).get(); final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class)); - assertThat(wrapped, instanceOf(RocksDBTimeOrderedSegmentedBytesStore.class)); - assertFalse(((RocksDBTimeOrderedSegmentedBytesStore) wrapped).hasIndex()); + assertThat(wrapped, instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class)); + assertFalse(((RocksDBTimeOrderedWindowSegmentedBytesStore) wrapped).hasIndex()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java index 0482f01ba5..8b5391a7cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -17,30 +17,101 @@ package org.apache.kafka.streams.state.internals; +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema; +import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema; import org.apache.kafka.test.KeyValueIteratorStub; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +@RunWith(Parameterized.class) public class SessionKeySchemaTest { + private static final Map<SchemaType, KeySchema> SCHEMA_TYPE_MAP = mkMap( + mkEntry(SchemaType.SessionKeySchema, new SessionKeySchema()), + mkEntry(SchemaType.PrefixedKeyFirstSchema, new KeyFirstSessionKeySchema()), + mkEntry(SchemaType.PrefixedTimeFirstSchema, new TimeFirstSessionKeySchema()) + ); + + private static final Map<SchemaType, Function<Windowed<Bytes>, Bytes>> WINDOW_TO_STORE_BINARY_MAP = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::toBinary), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::toBinary), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::toBinary) + ); + + private static final Map<SchemaType, Function<byte[], Long>> EXTRACT_END_TS_MAP = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::extractEndTimestamp), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::extractEndTimestamp), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::extractEndTimestamp) + ); + + private static final Map<SchemaType, Function<byte[], Long>> EXTRACT_START_TS_MAP = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::extractStartTimestamp), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::extractStartTimestamp), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::extractStartTimestamp) + ); + + @FunctionalInterface + interface TriFunction<A, B, C, R> { + R apply(A a, B b, C c); + } + + private static final Map<SchemaType, TriFunction<Windowed<String>, Serializer<String>, String, byte[]>> SERDE_TO_STORE_BINARY_MAP = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::toBinary), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::toBinary), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::toBinary) + ); + + private static final Map<SchemaType, TriFunction<byte[], Deserializer<String>, String, Windowed<String>>> SERDE_FROM_BYTES_MAP = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::from), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::from), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::from) + ); + + private static final Map<SchemaType, Function<Bytes, Windowed<Bytes>>> FROM_BYTES_MAP = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::from), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::from), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::from) + ); + + private static final Map<SchemaType, Function<byte[], Window>> EXTRACT_WINDOW = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::extractWindow), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::extractWindow), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::extractWindow) + ); + + private static final Map<SchemaType, Function<byte[], byte[]>> EXTRACT_KEY_BYTES = mkMap( + mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::extractKeyBytes), + mkEntry(SchemaType.PrefixedKeyFirstSchema, KeyFirstSessionKeySchema::extractKeyBytes), + mkEntry(SchemaType.PrefixedTimeFirstSchema, TimeFirstSessionKeySchema::extractKeyBytes) + ); private final String key = "key"; private final String topic = "topic"; @@ -52,8 +123,45 @@ public class SessionKeySchemaTest { private final Windowed<String> windowedKey = new Windowed<>(key, window); private final Serde<Windowed<String>> keySerde = new WindowedSerdes.SessionWindowedSerde<>(serde); - private final SessionKeySchema sessionKeySchema = new SessionKeySchema(); + private final KeySchema keySchema; private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator; + private final SchemaType schemaType; + private final Function<Windowed<Bytes>, Bytes> toBinary; + private final TriFunction<Windowed<String>, Serializer<String>, String, byte[]> serdeToBinary; + private final TriFunction<byte[], Deserializer<String>, String, Windowed<String>> serdeFromBytes; + private final Function<Bytes, Windowed<Bytes>> fromBytes; + private final Function<byte[], Long> extractStartTS; + private final Function<byte[], Long> extractEndTS; + private final Function<byte[], byte[]> extractKeyBytes; + private final Function<byte[], Window> extractWindow; + + private enum SchemaType { + SessionKeySchema, + PrefixedTimeFirstSchema, + PrefixedKeyFirstSchema + } + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() { + return asList(new Object[][] { + {SchemaType.SessionKeySchema}, + {SchemaType.PrefixedTimeFirstSchema}, + {SchemaType.PrefixedKeyFirstSchema} + }); + } + + public SessionKeySchemaTest(final SchemaType type) { + schemaType = type; + keySchema = SCHEMA_TYPE_MAP.get(type); + toBinary = WINDOW_TO_STORE_BINARY_MAP.get(schemaType); + serdeToBinary = SERDE_TO_STORE_BINARY_MAP.get(schemaType); + serdeFromBytes = SERDE_FROM_BYTES_MAP.get(schemaType); + fromBytes = FROM_BYTES_MAP.get(schemaType); + extractStartTS = EXTRACT_START_TS_MAP.get(schemaType); + extractEndTS = EXTRACT_END_TS_MAP.get(schemaType); + extractKeyBytes = EXTRACT_KEY_BYTES.get(schemaType); + extractWindow = EXTRACT_WINDOW.get(schemaType); + } @After public void after() { @@ -64,44 +172,44 @@ public class SessionKeySchemaTest { @Before public void before() { - final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1), - KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2), - KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3), - KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4), - KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5), - KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6)); + final List<KeyValue<Bytes, Integer>> keys = asList(KeyValue.pair(toBinary.apply(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1), + KeyValue.pair(toBinary.apply(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2), + KeyValue.pair(toBinary.apply(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3), + KeyValue.pair(toBinary.apply(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4), + KeyValue.pair(toBinary.apply(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5), + KeyValue.pair(toBinary.apply(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6)); iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator())); } @Test public void shouldFetchExactKeysSkippingLongerKeys() { final Bytes key = Bytes.wrap(new byte[]{0}); - final List<Integer> result = getValues(sessionKeySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE, true)); - assertThat(result, equalTo(Arrays.asList(2, 4))); + final List<Integer> result = getValues(keySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE, true)); + assertThat(result, equalTo(asList(2, 4))); } @Test public void shouldFetchExactKeySkippingShorterKeys() { final Bytes key = Bytes.wrap(new byte[]{0, 0}); - final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE, true); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE, true); final List<Integer> results = getValues(hasNextCondition); - assertThat(results, equalTo(Arrays.asList(1, 5))); + assertThat(results, equalTo(asList(1, 5))); } @Test public void shouldFetchAllKeysUsingNullKeys() { - final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true); final List<Integer> results = getValues(hasNextCondition); - assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6))); + assertThat(results, equalTo(asList(1, 2, 3, 4, 5, 6))); } @Test public void testUpperBoundWithLargeTimestamps() { - final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE); + final Bytes upper = keySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE); assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(SessionKeySchema.toBinary( + upper.compareTo(toBinary.apply( new Windowed<>( Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) @@ -110,7 +218,7 @@ public class SessionKeySchemaTest { assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(SessionKeySchema.toBinary( + upper.compareTo(toBinary.apply( new Windowed<>( Bytes.wrap(new byte[]{0xA, 0xB}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) @@ -118,18 +226,26 @@ public class SessionKeySchemaTest { )) >= 0 ); - assertThat(upper, equalTo(SessionKeySchema.toBinary( - new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) - ); + if (schemaType == SchemaType.PrefixedTimeFirstSchema) { + assertThat(upper, equalTo(toBinary.apply( + new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) + ); + } else { + assertThat(upper, equalTo(toBinary.apply( + new Windowed<>(Bytes.wrap(new byte[]{0xA}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) + ); + } } @Test public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() { - final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), Long.MAX_VALUE); + final Bytes upper = keySchema.upperRange(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), Long.MAX_VALUE); assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(SessionKeySchema.toBinary( + upper.compareTo(toBinary.apply( new Windowed<>( Bytes.wrap(new byte[]{0xA, (byte) 0x8F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) @@ -137,40 +253,53 @@ public class SessionKeySchemaTest { ) >= 0 ); - assertThat(upper, equalTo(SessionKeySchema.toBinary( + assertThat(upper, equalTo(toBinary.apply( new Windowed<>(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) ); } @Test public void testUpperBoundWithZeroTimestamp() { - final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); - - assertThat(upper, equalTo(SessionKeySchema.toBinary( - new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE)))) - ); + final Bytes upper = keySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); + final Function<Windowed<Bytes>, Bytes> toBinary = WINDOW_TO_STORE_BINARY_MAP.get(schemaType); + + if (schemaType == SchemaType.PrefixedTimeFirstSchema) { + assertThat(upper, equalTo(toBinary.apply( + new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, Long.MAX_VALUE)))) + ); + } else { + assertThat(upper, equalTo(toBinary.apply( + new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE)))) + ); + } } @Test public void testLowerBoundWithZeroTimestamp() { - final Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); - assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); + final Bytes lower = keySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); + assertThat(lower, equalTo(toBinary.apply(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); } @Test public void testLowerBoundMatchesTrailingZeros() { - final Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE); + final Bytes lower = keySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE); assertThat( "appending zeros to key should still be in range", - lower.compareTo(SessionKeySchema.toBinary( + lower.compareTo(toBinary.apply( new Windowed<>( Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) )) < 0 ); - assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); + if (schemaType == SchemaType.PrefixedTimeFirstSchema) { + assertThat(lower, equalTo(toBinary.apply( + new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, Long.MAX_VALUE))))); + } else { + assertThat(lower, equalTo(toBinary.apply( + new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); + } } @Test @@ -197,47 +326,47 @@ public class SessionKeySchemaTest { @Test public void shouldConvertToBinaryAndBack() { - final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy"); - final Windowed<String> result = SessionKeySchema.from(serialized, Serdes.String().deserializer(), "dummy"); + final byte[] serialized = serdeToBinary.apply(windowedKey, serde.serializer(), "dummy"); + final Windowed<String> result = serdeFromBytes.apply(serialized, Serdes.String().deserializer(), "dummy"); assertEquals(windowedKey, result); } @Test public void shouldExtractEndTimeFromBinary() { - final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy"); - assertEquals(endTime, SessionKeySchema.extractEndTimestamp(serialized)); + final byte[] serialized = serdeToBinary.apply(windowedKey, serde.serializer(), "dummy"); + assertEquals(endTime, (long) extractEndTS.apply(serialized)); } @Test public void shouldExtractStartTimeFromBinary() { - final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy"); - assertEquals(startTime, SessionKeySchema.extractStartTimestamp(serialized)); + final byte[] serialized = serdeToBinary.apply(windowedKey, serde.serializer(), "dummy"); + assertEquals(startTime, (long) extractStartTS.apply(serialized)); } @Test public void shouldExtractWindowFromBindary() { - final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy"); - assertEquals(window, SessionKeySchema.extractWindow(serialized)); + final byte[] serialized = serdeToBinary.apply(windowedKey, serde.serializer(), "dummy"); + assertEquals(window, extractWindow.apply(serialized)); } @Test public void shouldExtractKeyBytesFromBinary() { - final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy"); - assertArrayEquals(key.getBytes(), SessionKeySchema.extractKeyBytes(serialized)); + final byte[] serialized = serdeToBinary.apply(windowedKey, serde.serializer(), "dummy"); + assertArrayEquals(key.getBytes(), extractKeyBytes.apply(serialized)); } @Test public void shouldExtractKeyFromBinary() { - final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy"); - assertEquals(windowedKey, SessionKeySchema.from(serialized, serde.deserializer(), "dummy")); + final byte[] serialized = serdeToBinary.apply(windowedKey, serde.serializer(), "dummy"); + assertEquals(windowedKey, serdeFromBytes.apply(serialized, serde.deserializer(), "dummy")); } @Test public void shouldExtractBytesKeyFromBinary() { final Bytes bytesKey = Bytes.wrap(key.getBytes()); final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, window); - final Bytes serialized = SessionKeySchema.toBinary(windowedBytesKey); - assertEquals(windowedBytesKey, SessionKeySchema.from(serialized)); + final Bytes serialized = toBinary.apply(windowedBytesKey); + assertEquals(windowedBytesKey, fromBytes.apply(serialized)); } private List<Integer> getValues(final HasNextCondition hasNextCondition) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java index b9ef24d3c6..bf597fb789 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java @@ -102,7 +102,7 @@ public class TimeOrderedWindowStoreTest { private static final String CACHE_NAMESPACE = "0_0-store-name"; private InternalMockProcessorContext context; - private RocksDBTimeOrderedSegmentedBytesStore bytesStore; + private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore; private WindowStore<Bytes, byte[]> underlyingStore; private TimeOrderedCachingWindowStore cachingStore; private CacheFlushListenerStub<Windowed<String>, String> cacheListener; @@ -123,7 +123,7 @@ public class TimeOrderedWindowStoreTest { @Before public void setUp() { baseKeySchema = new TimeFirstWindowKeySchema(); - bytesStore = new RocksDBTimeOrderedSegmentedBytesStore("test", "metrics-scope", 100, SEGMENT_INTERVAL, hasIndex); + bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test", "metrics-scope", 100, SEGMENT_INTERVAL, hasIndex); underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, WINDOW_SIZE); final TimeWindowedDeserializer<String> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE); keyDeserializer.setIsChangelogTopic(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java index e9360534a8..4729a73f14 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java @@ -130,7 +130,7 @@ public class WindowKeySchemaTest { final private KeySchema keySchema; final private Serde<Windowed<String>> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde, Long.MAX_VALUE); final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray()); - final private SchemaType schemaType; + final public SchemaType schemaType; private enum SchemaType { WindowKeySchema, @@ -141,13 +141,13 @@ public class WindowKeySchemaTest { @Parameterized.Parameters(name = "{0}") public static Collection<Object[]> data() { return asList(new Object[][] { - {"WindowKeySchema", SchemaType.WindowKeySchema}, - {"PrefixedTimeFirstSchema", SchemaType.PrefixedTimeFirstSchema}, - {"PrefixedKeyFirstSchema", SchemaType.PrefixedKeyFirstSchema} + {SchemaType.WindowKeySchema}, + {SchemaType.PrefixedTimeFirstSchema}, + {SchemaType.PrefixedKeyFirstSchema} }); } - public WindowKeySchemaTest(final String name, final SchemaType type) { + public WindowKeySchemaTest(final SchemaType type) { schemaType = type; keySchema = SCHEMA_TYPE_MAP.get(type); }