This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8649717 KAFKA-6521: Use timestamped stores for KTables (#6667) 8649717 is described below commit 8649717d6dde081c75fc441a56f63ee1556dc758 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Sun May 12 11:50:55 2019 +0200 KAFKA-6521: Use timestamped stores for KTables (#6667) Reviewers: John Roesler <j...@confluent.io>, Boyang Chen <boy...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../kstream/internals/KGroupedStreamImpl.java | 2 +- .../kstream/internals/KGroupedTableImpl.java | 2 +- .../kstream/internals/KStreamAggregate.java | 38 +++-- .../streams/kstream/internals/KStreamReduce.java | 37 ++-- .../kstream/internals/KStreamWindowAggregate.java | 40 ++--- .../streams/kstream/internals/KTableAggregate.java | 22 ++- .../streams/kstream/internals/KTableFilter.java | 22 ++- .../streams/kstream/internals/KTableImpl.java | 17 +- .../kstream/internals/KTableKTableJoinMerger.java | 17 +- .../streams/kstream/internals/KTableMapValues.java | 22 ++- .../KTableMaterializedValueGetterSupplier.java | 14 +- .../streams/kstream/internals/KTableReduce.java | 22 ++- .../streams/kstream/internals/KTableSource.java | 22 ++- .../internals/KTableSourceValueGetterSupplier.java | 16 +- .../kstream/internals/KTableTransformValues.java | 29 ++-- .../internals/SessionWindowedKStreamImpl.java | 3 +- .../kstream/internals/TimeWindowedKStreamImpl.java | 16 +- .../internals/TimestampedCacheFlushListener.java | 53 ++++++ ...a => TimestampedKeyValueStoreMaterializer.java} | 11 +- ...rwarder.java => TimestampedTupleForwarder.java} | 25 +-- .../streams/kstream/internals/TupleForwarder.java | 15 +- .../internals/graph/KTableKTableJoinNode.java | 25 +-- .../internals/graph/TableProcessorNode.java | 8 +- .../kstream/internals/graph/TableSourceNode.java | 9 +- .../org/apache/kafka/streams/processor/To.java | 23 +++ .../internals/GlobalProcessorContextImpl.java | 10 +- .../processor/internals/ProcessorContextImpl.java | 6 +- .../kafka/streams/state/ValueAndTimestamp.java | 14 +- .../state/internals/CachingKeyValueStore.java | 6 +- .../internals/MeteredTimestampedKeyValueStore.java | 2 +- .../internals/TimestampedKeyValueStoreBuilder.java | 100 ++++++++++- .../internals/TimestampedWindowStoreBuilder.java | 107 +++++++++++- .../kstream/internals/KTableReduceTest.java | 14 +- .../internals/KTableTransformValuesTest.java | 6 +- .../TimestampedCacheFlushListenerTest.java | 75 ++++++++ ...est.java => TimestampedTupleForwarderTest.java} | 31 ++-- .../kstream/internals/TupleForwarderTest.java | 19 ++- .../internals/GlobalStreamThreadTest.java | 4 +- .../processor/internals/StandbyTaskTest.java | 30 ++-- ... TimestampedKeyValueStoreMaterializerTest.java} | 50 +++--- .../org/apache/kafka/streams/state/StoresTest.java | 32 ++++ .../TimestampedKeyValueStoreBuilderTest.java | 8 +- .../TimestampedWindowStoreBuilderTest.java | 7 +- .../GenericInMemoryTimestampedKeyValueStore.java | 190 +++++++++++++++++++++ 44 files changed, 946 insertions(+), 275 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index eab1e8f..6f6521c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -185,7 +185,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) { return aggregateBuilder.build( functionName, - new KeyValueStoreMaterializer<>(materializedInternal).materialize(), + new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(), aggregateSupplier, materializedInternal.queryableStoreName(), materializedInternal.keySerde(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 56be0f6..75c998a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -88,7 +88,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( funcName, new ProcessorParameters<>(aggregateSupplier, funcName), - new KeyValueStoreMaterializer<>(materialized).materialize() + new TimestampedKeyValueStoreMaterializer<>(materialized).materialize() ); // now the repartition node must be the parent of the StateProcessorNode diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 4ead76b..7d367d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -22,17 +22,19 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> { private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class); private final String storeName; private final Initializer<T> initializer; private final Aggregator<? super K, ? super V, T> aggregator; - private boolean sendOldValues = false; KStreamAggregate(final String storeName, final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator) { @@ -51,22 +53,25 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, sendOldValues = true; } - private class KStreamAggregateProcessor extends AbstractProcessor<K, V> { - private KeyValueStore<K, T> store; + private class KStreamAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedKeyValueStore<K, T> store; private StreamsMetricsImpl metrics; - private TupleForwarder<K, T> tupleForwarder; + private TimestampedTupleForwarder<K, T> tupleForwarder; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); - store = (KeyValueStore<K, T>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues); + store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } - @Override public void process(final K key, final V value) { // If the key or value is null we don't need to proceed @@ -79,7 +84,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, return; } - T oldAgg = store.get(key); + final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key); + T oldAgg = getValueOrNull(oldAggAndTimestamp); if (oldAgg == null) { oldAgg = initializer.apply(); @@ -91,14 +97,13 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, newAgg = aggregator.apply(key, value, newAgg); // update the store with the new value - store.put(key, newAgg); + store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp())); tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } @Override public KTableValueGetterSupplier<K, T> view() { - return new KTableValueGetterSupplier<K, T>() { public KTableValueGetter<K, T> get() { @@ -112,23 +117,22 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, }; } - private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> { - private KeyValueStore<K, T> store; + private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> { + private TimestampedKeyValueStore<K, T> store; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { - store = (KeyValueStore<K, T>) context.getStateStore(storeName); + store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName); } @Override public T get(final K key) { - return store.get(key); + return getValueOrNull(store.get(key)); } @Override - public void close() { - } + public void close() {} } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index cd67283..a1b9f73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -21,10 +21,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> { private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class); @@ -48,10 +51,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, sendOldValues = true; } - private class KStreamReduceProcessor extends AbstractProcessor<K, V> { - private KeyValueStore<K, V> store; - private TupleForwarder<K, V> tupleForwarder; + private class KStreamReduceProcessor extends AbstractProcessor<K, V> { + private TimestampedKeyValueStore<K, V> store; + private TimestampedTupleForwarder<K, V> tupleForwarder; private StreamsMetricsImpl metrics; @SuppressWarnings("unchecked") @@ -59,11 +62,14 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); - store = (KeyValueStore<K, V>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } - @Override public void process(final K key, final V value) { // If the key or value is null we don't need to proceed @@ -76,7 +82,8 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, return; } - final V oldAgg = store.get(key); + final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key); + final V oldAgg = getValueOrNull(oldAggAndTimestamp); V newAgg = oldAgg; // try to add the new value @@ -87,14 +94,13 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, } // update the store with the new value - store.put(key, newAgg); + store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp())); tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } @Override public KTableValueGetterSupplier<K, V> view() { - return new KTableValueGetterSupplier<K, V>() { public KTableValueGetter<K, V> get() { @@ -108,24 +114,23 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, }; } - private class KStreamReduceValueGetter implements KTableValueGetter<K, V> { - private KeyValueStore<K, V> store; + private class KStreamReduceValueGetter implements KTableValueGetter<K, V> { + private TimestampedKeyValueStore<K, V> store; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { - store = (KeyValueStore<K, V>) context.getStateStore(storeName); + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName); } @Override public V get(final K key) { - return store.get(key); + return getValueOrNull(store.get(key)); } @Override - public void close() { - } + public void close() {} } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index d09dadd..95ae54f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -29,12 +29,15 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -69,10 +72,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr sendOldValues = true; } - private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> { - private WindowStore<K, Agg> windowStore; - private TupleForwarder<Windowed<K>, Agg> tupleForwarder; + private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedWindowStore<K, Agg> windowStore; + private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; @@ -83,13 +86,14 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr public void init(final ProcessorContext context) { super.init(context); internalProcessorContext = (InternalProcessorContext) context; - metrics = (StreamsMetricsImpl) context.metrics(); - lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); - - windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<>(context), sendOldValues); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } @Override @@ -115,7 +119,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr final Long windowStart = entry.getKey(); final long windowEnd = entry.getValue().end(); if (windowEnd > closeTime) { - Agg oldAgg = windowStore.fetch(key, windowStart); + final ValueAndTimestamp<Agg> oldAggAndTimestamp = windowStore.fetch(key, windowStart); + Agg oldAgg = getValueOrNull(oldAggAndTimestamp); if (oldAgg == null) { oldAgg = initializer.apply(); @@ -124,7 +129,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr final Agg newAgg = aggregator.apply(key, value, oldAgg); // update the store with the new value - windowStore.put(key, newAgg, windowStart); + windowStore.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()), windowStart); tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null); } else { log.debug( @@ -154,7 +159,6 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr @Override public KTableValueGetterSupplier<Windowed<K>, Agg> view() { - return new KTableValueGetterSupplier<Windowed<K>, Agg>() { public KTableValueGetter<Windowed<K>, Agg> get() { @@ -168,14 +172,14 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr }; } - private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> { - private WindowStore<K, Agg> windowStore; + private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> { + private TimestampedWindowStore<K, Agg> windowStore; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { - windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); } @SuppressWarnings("unchecked") @@ -183,12 +187,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr public Agg get(final Windowed<K> windowedKey) { final K key = windowedKey.key(); final W window = (W) windowedKey.window(); - - return windowStore.fetch(key, window.start()); + return getValueOrNull(windowStore.fetch(key, window.start())); } @Override - public void close() { - } + public void close() {} } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index 1c44486..88bf867 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -22,7 +22,10 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> { @@ -54,15 +57,19 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T } private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> { - private KeyValueStore<K, T> store; - private TupleForwarder<K, T> tupleForwarder; + private TimestampedKeyValueStore<K, T> store; + private TimestampedTupleForwarder<K, T> tupleForwarder; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); - store = (KeyValueStore<K, T>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues); + store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } /** @@ -75,7 +82,8 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null."); } - final T oldAgg = store.get(key); + final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key); + final T oldAgg = getValueOrNull(oldAggAndTimestamp); final T intermediateAgg; // first try to remove the old value @@ -101,7 +109,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T } // update the store with the new value - store.put(key, newAgg); + store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp())); tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 2410074..afa1822 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -20,10 +20,10 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { - private final KTableImpl<K, ?, V> parent; private final Predicate<? super K, ? super V> predicate; private final boolean filterNot; @@ -61,18 +61,22 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { return newValue; } + private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> { - private KeyValueStore<K, V> store; - private TupleForwarder<K, V> tupleForwarder; + private TimestampedKeyValueStore<K, V> store; + private TimestampedTupleForwarder<K, V> tupleForwarder; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); if (queryableName != null) { - store = (KeyValueStore<K, V>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, - new ForwardingCacheFlushListener<>(context), sendOldValues); + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } } @@ -86,13 +90,12 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } if (queryableName != null) { - store.put(key, newValue); + store.put(key, ValueAndTimestamp.make(newValue, context().timestamp())); tupleForwarder.maybeForward(key, newValue, oldValue); } else { context().forward(key, new Change<>(newValue, oldValue)); } } - } @Override @@ -117,6 +120,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } } + private class KTableFilterValueGetter implements KTableValueGetter<K, V> { private final KTableValueGetter<K, V> parentGetter; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index c86f6ce..3a4994f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +118,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final Serde<K> keySerde; final Serde<V> valueSerde; final String queryableStoreName; - final StoreBuilder<KeyValueStore<K, V>> storeBuilder; + final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder; if (materializedInternal != null) { // we actually do not need to generate store names at all since if it is not specified, we will not @@ -132,7 +133,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde; queryableStoreName = materializedInternal.queryableStoreName(); // only materialize if materialized is specified and it has queryable name - storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; } else { keySerde = this.keySerde; valueSerde = this.valSerde; @@ -204,7 +205,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final Serde<K> keySerde; final Serde<VR> valueSerde; final String queryableStoreName; - final StoreBuilder<KeyValueStore<K, VR>> storeBuilder; + final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder; if (materializedInternal != null) { // we actually do not need to generate store names at all since if it is not specified, we will not @@ -216,7 +217,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< valueSerde = materializedInternal.valueSerde(); queryableStoreName = materializedInternal.queryableStoreName(); // only materialize if materialized is specified and it has queryable name - storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; } else { keySerde = this.keySerde; valueSerde = null; @@ -312,7 +313,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final Serde<K> keySerde; final Serde<VR> valueSerde; final String queryableStoreName; - final StoreBuilder<KeyValueStore<K, VR>> storeBuilder; + final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder; if (materializedInternal != null) { // don't inherit parent value serde, since this operation may change the value type, more specifically: @@ -322,7 +323,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< valueSerde = materializedInternal.valueSerde(); queryableStoreName = materializedInternal.queryableStoreName(); // only materialize if materialized is specified and it has queryable name - storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; } else { keySerde = this.keySerde; valueSerde = null; @@ -538,13 +539,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final Serde<K> keySerde; final Serde<VR> valueSerde; final String queryableStoreName; - final StoreBuilder<KeyValueStore<K, VR>> storeBuilder; + final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder; if (materializedInternal != null) { keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde; valueSerde = materializedInternal.valueSerde(); queryableStoreName = materializedInternal.storeName(); - storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal).materialize(); + storeBuilder = new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(); } else { keySerde = this.keySerde; valueSerde = null; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java index de38042..86088c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java @@ -19,7 +19,8 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import java.util.Collections; import java.util.HashSet; @@ -94,17 +95,19 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, } private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> { - private KeyValueStore<K, V> store; - private TupleForwarder<K, V> tupleForwarder; + private TimestampedKeyValueStore<K, V> store; + private TimestampedTupleForwarder<K, V> tupleForwarder; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); if (queryableName != null) { - store = (KeyValueStore<K, V>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, - new ForwardingCacheFlushListener<K, V>(context), + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), sendOldValues); } } @@ -112,7 +115,7 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, @Override public void process(final K key, final Change<V> value) { if (queryableName != null) { - store.put(key, value.newValue); + store.put(key, ValueAndTimestamp.make(value.newValue, context().timestamp())); tupleForwarder.maybeForward(key, value.newValue, sendOldValues ? value.oldValue : null); } else { if (sendOldValues) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index aae1437..496127c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -20,11 +20,11 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { - private final KTableImpl<K, ?, V> parent; private final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper; private final String queryableName; @@ -81,17 +81,22 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { return newValue; } + private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> { - private KeyValueStore<K, V1> store; - private TupleForwarder<K, V1> tupleForwarder; + private TimestampedKeyValueStore<K, V1> store; + private TimestampedTupleForwarder<K, V1> tupleForwarder; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); if (queryableName != null) { - store = (KeyValueStore<K, V1>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context), sendOldValues); + store = (TimestampedKeyValueStore<K, V1>) context.getStateStore(queryableName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<K, V1>(context), + sendOldValues); } } @@ -101,7 +106,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue) : null; if (queryableName != null) { - store.put(key, newValue); + store.put(key, ValueAndTimestamp.make(newValue, context().timestamp())); tupleForwarder.maybeForward(key, newValue, oldValue); } else { context().forward(key, new Change<>(newValue, oldValue)); @@ -109,8 +114,8 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { } } - private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> { + private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> { private final KTableValueGetter<K, V> parentGetter; KTableMapValuesValueGetter(final KTableValueGetter<K, V> parentGetter) { @@ -127,7 +132,6 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { return computeValue(key, parentGetter.get(key)); } - @Override public void close() { parentGetter.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java index 0c17d59..a84251c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java @@ -18,10 +18,11 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; -public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> { +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; +public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> { private final String storeName; KTableMaterializedValueGetterSupplier(final String storeName) { @@ -38,21 +39,20 @@ public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueG } private class KTableMaterializedValueGetter implements KTableValueGetter<K, V> { - private KeyValueStore<K, V> store; + private TimestampedKeyValueStore<K, V> store; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { - store = (KeyValueStore<K, V>) context.getStateStore(storeName); + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName); } @Override public V get(final K key) { - return store.get(key); + return getValueOrNull(store.get(key)); } @Override - public void close() { - } + public void close() {} } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index 70db6443..3055f51 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -21,7 +21,10 @@ import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { @@ -49,15 +52,19 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { private class KTableReduceProcessor extends AbstractProcessor<K, Change<V>> { - private KeyValueStore<K, V> store; - private TupleForwarder<K, V> tupleForwarder; + private TimestampedKeyValueStore<K, V> store; + private TimestampedTupleForwarder<K, V> tupleForwarder; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); - store = (KeyValueStore<K, V>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } /** @@ -70,7 +77,8 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { throw new StreamsException("Record key for KTable reduce operator with state " + storeName + " should not be null."); } - final V oldAgg = store.get(key); + final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key); + final V oldAgg = getValueOrNull(oldAggAndTimestamp); final V intermediateAgg; // first try to remove the old value @@ -93,7 +101,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { } // update the store with the new value - store.put(key, newAgg); + store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp())); tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index 6fc57bc..8b6ec6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -21,12 +21,15 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + public class KTableSource<K, V> implements ProcessorSupplier<K, V> { private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class); @@ -66,8 +69,8 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { private class KTableSourceProcessor extends AbstractProcessor<K, V> { - private KeyValueStore<K, V> store; - private TupleForwarder<K, V> tupleForwarder; + private TimestampedKeyValueStore<K, V> store; + private TimestampedTupleForwarder<K, V> tupleForwarder; private StreamsMetricsImpl metrics; @SuppressWarnings("unchecked") @@ -76,8 +79,12 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); if (queryableName != null) { - store = (KeyValueStore<K, V>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } } @@ -94,8 +101,9 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { } if (queryableName != null) { - final V oldValue = sendOldValues ? store.get(key) : null; - store.put(key, value); + final ValueAndTimestamp<V> oldValueAndTimestamp = sendOldValues ? store.get(key) : null; + final V oldValue = getValueOrNull(oldValueAndTimestamp); + store.put(key, ValueAndTimestamp.make(value, context().timestamp())); tupleForwarder.maybeForward(key, value, oldValue); } else { context().forward(key, new Change<>(value, null)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java index 6882dac..5ec33f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java @@ -17,10 +17,11 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; -public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> { +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; +public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> { private final String storeName; KTableSourceValueGetterSupplier(final String storeName) { @@ -37,21 +38,18 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS } private class KTableSourceValueGetter implements KTableValueGetter<K, V> { - - ReadOnlyKeyValueStore<K, V> store = null; + TimestampedKeyValueStore<K, V> store = null; @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { - store = (ReadOnlyKeyValueStore<K, V>) context.getStateStore(storeName); + store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName); } public V get(final K key) { - return store.get(key); + return getValueOrNull(store.get(key)); } @Override - public void close() { - } + public void close() {} } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java index 88cea4f..5d81711 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java @@ -22,12 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import java.util.Objects; -class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; +class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { private final KTableImpl<K, ?, V> parent; private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier; private final String queryableName; @@ -74,10 +76,11 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V sendOldValues = true; } + private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> { private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer; - private KeyValueStore<K, V1> store; - private TupleForwarder<K, V1> tupleForwarder; + private TimestampedKeyValueStore<K, V1> store; + private TimestampedTupleForwarder<K, V1> tupleForwarder; private KTableTransformValuesProcessor(final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) { this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer"); @@ -87,13 +90,14 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V @Override public void init(final ProcessorContext context) { super.init(context); - valueTransformer.init(new ForwardingDisabledProcessorContext(context)); - if (queryableName != null) { - final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context); - store = (KeyValueStore<K, V1>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, flushListener, sendOldValues); + store = (TimestampedKeyValueStore<K, V1>) context.getStateStore(queryableName); + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); } } @@ -105,8 +109,8 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V final V1 oldValue = sendOldValues ? valueTransformer.transform(key, change.oldValue) : null; context().forward(key, new Change<>(newValue, oldValue)); } else { - final V1 oldValue = sendOldValues ? store.get(key) : null; - store.put(key, newValue); + final V1 oldValue = sendOldValues ? getValueOrNull(store.get(key)) : null; + store.put(key, ValueAndTimestamp.make(newValue, context().timestamp())); tupleForwarder.maybeForward(key, newValue, oldValue); } } @@ -117,8 +121,8 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V } } - private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> { + private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> { private final KTableValueGetter<K, V> parentGetter; private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer; @@ -131,7 +135,6 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V @Override public void init(final ProcessorContext context) { parentGetter.init(context); - valueTransformer.init(new ForwardingDisabledProcessorContext(context)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index 8c731be..2106b1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import java.time.Duration; import java.util.Objects; import java.util.Set; @@ -193,7 +194,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple } supplier = Stores.persistentSessionStore( materialized.storeName(), - retentionPeriod + Duration.ofMillis(retentionPeriod) ); } final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index f549ce9..a257603 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -31,8 +31,10 @@ import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; import java.time.Duration; import java.util.Objects; @@ -154,7 +156,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr } @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode - private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) { + private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) { WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { if (materialized.retention() != null) { @@ -169,7 +171,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr + " retention=[" + retentionPeriod + "]"); } - supplier = Stores.persistentWindowStore( + supplier = Stores.persistentTimestampedWindowStore( materialized.storeName(), Duration.ofMillis(retentionPeriod), Duration.ofMillis(windows.size()), @@ -190,16 +192,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr + " retention=[" + windows.maintainMs() + "]"); } - supplier = Stores.persistentWindowStore( + supplier = new RocksDbWindowBytesStoreSupplier( materialized.storeName(), windows.maintainMs(), - windows.segments, + Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L), windows.size(), - false - ); + false, + true); } } - final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder( + final StoreBuilder<TimestampedWindowStore<K, VR>> builder = Stores.timestampedWindowStoreBuilder( supplier, materialized.keySerde(), materialized.valueSerde() diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java new file mode 100644 index 0000000..5540376 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.CacheFlushListener; + +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> { + private final InternalProcessorContext context; + private final ProcessorNode myNode; + + TimestampedCacheFlushListener(final ProcessorContext context) { + this.context = (InternalProcessorContext) context; + myNode = this.context.currentNode(); + } + + @Override + public void apply(final K key, + final ValueAndTimestamp<V> newValue, + final ValueAndTimestamp<V> oldValue, + final long timestamp) { + final ProcessorNode prev = context.currentNode(); + context.setCurrentNode(myNode); + try { + context.forward( + key, + new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)), + To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp)); + } finally { + context.setCurrentNode(prev); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java similarity index 79% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java index 67872be..fb40b46 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java @@ -21,24 +21,25 @@ import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; -public class KeyValueStoreMaterializer<K, V> { +public class TimestampedKeyValueStoreMaterializer<K, V> { private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized; - public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + public TimestampedKeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { this.materialized = materialized; } /** * @return StoreBuilder */ - public StoreBuilder<KeyValueStore<K, V>> materialize() { + public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() { KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { final String name = materialized.storeName(); - supplier = Stores.persistentKeyValueStore(name); + supplier = Stores.persistentTimestampedKeyValueStore(name); } - final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder( + final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder( supplier, materialized.keySerde(), materialized.valueSerde()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java similarity index 74% copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java index 323e198..ab2b506 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java @@ -25,28 +25,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; * Forwarding by this class only occurs when caching is not enabled. If caching is enabled, * forwarding occurs in the flush listener when the cached store flushes. * - * @param <K> - * @param <V> + * @param <K> the type of the key + * @param <V> the type of the value */ -class TupleForwarder<K, V> { - private final boolean cachingEnabled; +class TimestampedTupleForwarder<K, V> { private final ProcessorContext context; + private final boolean sendOldValues; + private final boolean cachingEnabled; @SuppressWarnings("unchecked") - TupleForwarder(final StateStore store, - final ProcessorContext context, - final ForwardingCacheFlushListener<K, V> flushListener, - final boolean sendOldValues) { - cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); + TimestampedTupleForwarder(final StateStore store, + final ProcessorContext context, + final TimestampedCacheFlushListener<K, V> flushListener, + final boolean sendOldValues) { this.context = context; + this.sendOldValues = sendOldValues; + cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); } public void maybeForward(final K key, final V newValue, final V oldValue) { - if (cachingEnabled) { - return; + if (!cachingEnabled) { + context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null)); } - context.forward(key, new Change<>(newValue, oldValue)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index 323e198..94b0ebd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -25,28 +25,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; * Forwarding by this class only occurs when caching is not enabled. If caching is enabled, * forwarding occurs in the flush listener when the cached store flushes. * - * @param <K> - * @param <V> + * @param <K> the type of the key + * @param <V> the type of the value */ class TupleForwarder<K, V> { - private final boolean cachingEnabled; private final ProcessorContext context; + private final boolean sendOldValues; + private final boolean cachingEnabled; @SuppressWarnings("unchecked") TupleForwarder(final StateStore store, final ProcessorContext context, final ForwardingCacheFlushListener<K, V> flushListener, final boolean sendOldValues) { - cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); this.context = context; + this.sendOldValues = sendOldValues; + cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); } public void maybeForward(final K key, final V newValue, final V oldValue) { - if (cachingEnabled) { - return; + if (!cachingEnabled) { + context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null)); } - context.forward(key, new Change<>(newValue, oldValue)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java index 03bdda0..542726b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java @@ -22,8 +22,8 @@ import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger; import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import java.util.Arrays; @@ -36,7 +36,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K private final Serde<VR> valueSerde; private final String[] joinThisStoreNames; private final String[] joinOtherStoreNames; - private final StoreBuilder<KeyValueStore<K, VR>> storeBuilder; + private final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder; KTableKTableJoinNode(final String nodeName, final ProcessorParameters<K, Change<V1>> joinThisProcessorParameters, @@ -48,7 +48,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K final Serde<VR> valueSerde, final String[] joinThisStoreNames, final String[] joinOtherStoreNames, - final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) { + final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder) { super(nodeName, null, @@ -98,23 +98,24 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K final String otherProcessorName = otherProcessorParameters().processorName(); final String mergeProcessorName = mergeProcessorParameters().processorName(); - topologyBuilder.addProcessor(thisProcessorName, + topologyBuilder.addProcessor( + thisProcessorName, thisProcessorParameters().processorSupplier(), thisJoinSideNodeName()); - topologyBuilder.addProcessor(otherProcessorName, + topologyBuilder.addProcessor( + otherProcessorName, otherProcessorParameters().processorSupplier(), otherJoinSideNodeName()); - topologyBuilder.addProcessor(mergeProcessorName, + topologyBuilder.addProcessor( + mergeProcessorName, mergeProcessorParameters().processorSupplier(), thisProcessorName, otherProcessorName); - topologyBuilder.connectProcessorAndStateStores(thisProcessorName, - joinOtherStoreNames); - topologyBuilder.connectProcessorAndStateStores(otherProcessorName, - joinThisStoreNames); + topologyBuilder.connectProcessorAndStateStores(thisProcessorName, joinOtherStoreNames); + topologyBuilder.connectProcessorAndStateStores(otherProcessorName, joinThisStoreNames); if (storeBuilder != null) { topologyBuilder.addStateStore(storeBuilder, mergeProcessorName); @@ -144,7 +145,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K private String[] joinThisStoreNames; private String[] joinOtherStoreNames; private String queryableStoreName; - private StoreBuilder<KeyValueStore<K, VR>> storeBuilder; + private StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder; private KTableKTableJoinNodeBuilder() { } @@ -199,7 +200,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K return this; } - public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) { + public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder) { this.storeBuilder = storeBuilder; return this; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java index df009df..24df4d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java @@ -18,26 +18,26 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import java.util.Arrays; public class TableProcessorNode<K, V> extends StreamsGraphNode { private final ProcessorParameters<K, V> processorParameters; - private final StoreBuilder<KeyValueStore<K, V>> storeBuilder; + private final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder; private final String[] storeNames; public TableProcessorNode(final String nodeName, final ProcessorParameters<K, V> processorParameters, - final StoreBuilder<KeyValueStore<K, V>> storeBuilder) { + final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder) { this(nodeName, processorParameters, storeBuilder, null); } public TableProcessorNode(final String nodeName, final ProcessorParameters<K, V> processorParameters, - final StoreBuilder<KeyValueStore<K, V>> storeBuilder, + final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder, final String[] storeNames) { super(nodeName); this.processorParameters = processorParameters; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index fa979b2..b1df6ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -20,11 +20,12 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.KTableSource; -import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import java.util.Collections; @@ -82,10 +83,10 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> { public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { final String topicName = getTopicNames().iterator().next(); - // TODO: we assume source KTables can only be key-value stores for now. + // TODO: we assume source KTables can only be timestamped-key-value stores for now. // should be expanded for other types of stores as well. - final StoreBuilder<KeyValueStore<K, V>> storeBuilder = - new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize(); + final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder = + new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize(); if (isGlobalKTable) { topologyBuilder.addGlobalStore(storeBuilder, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java index f9cc4c8..dc124aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor; +import java.util.Objects; + /** * This class is used to provide the optional parameters when sending output records to downstream processor * using {@link ProcessorContext#forward(Object, Object, To)}. @@ -65,4 +67,25 @@ public class To { this.timestamp = timestamp; return this; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final To to = (To) o; + return timestamp == to.timestamp && + Objects.equals(childName, to.childName); + } + + /** + * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable. + */ + @Override + public int hashCode() { + throw new UnsupportedOperationException("To is unsafe for use in Hash collections"); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 0693ef7..3c8a3ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -25,10 +25,14 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedKeyValueStoreReadWriteDecorator; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedWindowStoreReadWriteDecorator; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -50,8 +54,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { public StateStore getStateStore(final String name) { final StateStore store = stateManager.getGlobalStore(name); - if (store instanceof KeyValueStore) { + if (store instanceof TimestampedKeyValueStore) { + return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store); + } else if (store instanceof KeyValueStore) { return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); + } else if (store instanceof TimestampedWindowStore) { + return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store); } else if (store instanceof WindowStore) { return new WindowStoreReadWriteDecorator((WindowStore) store); } else if (store instanceof SessionStore) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 232b28d..1df6610 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -498,11 +498,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } - private static class TimestampedKeyValueStoreReadWriteDecorator<K, V> + static class TimestampedKeyValueStoreReadWriteDecorator<K, V> extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>> implements TimestampedKeyValueStore<K, V> { - private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) { + TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) { super(inner); } } @@ -564,7 +564,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } - private static class TimestampedWindowStoreReadWriteDecorator<K, V> + static class TimestampedWindowStoreReadWriteDecorator<K, V> extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>> implements TimestampedWindowStore<K, V> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java index 8bb652c..f5fc7a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java @@ -25,7 +25,7 @@ import java.util.Objects; * * @param <V> */ -public class ValueAndTimestamp<V> { +public final class ValueAndTimestamp<V> { private final V value; private final long timestamp; @@ -50,6 +50,18 @@ public class ValueAndTimestamp<V> { return value == null ? null : new ValueAndTimestamp<>(value, timestamp); } + /** + * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter + * if the parameter is not {@code null}. + * + * @param valueAndTimestamp a {@link ValueAndTimestamp} instance; can be {@code null} + * @param <V> the type of the value + * @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null} + */ + public static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp) { + return valueAndTimestamp == null ? null : valueAndTimestamp.value(); + } + public V value() { return value; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 95e20b4..8aa0ceb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -24,16 +24,16 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Objects; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -class CachingKeyValueStore +public class CachingKeyValueStore extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]> implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 2fa7c96..468b554 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -51,6 +51,6 @@ public class MeteredTimestampedKeyValueStore<K, V> serdes = new StateSerdes<>( ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde); + valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index f43e4e6..863b44b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -19,12 +19,17 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; +import java.util.List; import java.util.Objects; public class TimestampedKeyValueStoreBuilder<K, V> @@ -48,8 +53,12 @@ public class TimestampedKeyValueStoreBuilder<K, V> @Override public TimestampedKeyValueStore<K, V> build() { KeyValueStore<Bytes, byte[]> store = storeSupplier.get(); - if (!(store instanceof TimestampedBytesStore) && store.persistent()) { - store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store); + if (!(store instanceof TimestampedBytesStore)) { + if (store.persistent()) { + store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store); + } else { + store = new InMemoryTimestampedKeyValueStoreMarker(store); + } } return new MeteredTimestampedKeyValueStore<>( maybeWrapCaching(maybeWrapLogging(store)), @@ -72,4 +81,91 @@ public class TimestampedKeyValueStoreBuilder<K, V> } return new ChangeLoggingTimestampedKeyValueBytesStore(inner); } + + private final static class InMemoryTimestampedKeyValueStoreMarker + implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore { + + final KeyValueStore<Bytes, byte[]> wrapped; + + private InMemoryTimestampedKeyValueStoreMarker(final KeyValueStore<Bytes, byte[]> wrapped) { + if (wrapped.persistent()) { + throw new IllegalArgumentException("Provided store must not be a persistent store, but it is."); + } + this.wrapped = wrapped; + } + + @Override + public void init(final ProcessorContext context, + final StateStore root) { + wrapped.init(context, root); + } + + @Override + public void put(final Bytes key, + final byte[] value) { + wrapped.put(key, value); + } + + @Override + public byte[] putIfAbsent(final Bytes key, + final byte[] value) { + return wrapped.putIfAbsent(key, value); + } + + @Override + public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { + wrapped.putAll(entries); + } + + @Override + public byte[] delete(final Bytes key) { + return wrapped.delete(key); + } + + @Override + public byte[] get(final Bytes key) { + return wrapped.get(key); + } + + @Override + public KeyValueIterator<Bytes, byte[]> range(final Bytes from, + final Bytes to) { + return wrapped.range(from, to); + } + + @Override + public KeyValueIterator<Bytes, byte[]> all() { + return wrapped.all(); + } + + @Override + public long approximateNumEntries() { + return wrapped.approximateNumEntries(); + } + + @Override + public void flush() { + wrapped.flush(); + } + + @Override + public void close() { + wrapped.close(); + } + + @Override + public boolean isOpen() { + return wrapped.isOpen(); + } + + @Override + public String name() { + return wrapped.name(); + } + + @Override + public boolean persistent() { + return false; + } + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index 2c7c950..808d31e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -19,11 +19,16 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; import java.util.Objects; @@ -36,7 +41,7 @@ public class TimestampedWindowStoreBuilder<K, V> final Serde<K> keySerde, final Serde<V> valueSerde, final Time time) { - super(storeSupplier.name(), keySerde, new ValueAndTimestampSerde<>(valueSerde), time); + super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time); Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); this.storeSupplier = storeSupplier; } @@ -44,8 +49,12 @@ public class TimestampedWindowStoreBuilder<K, V> @Override public TimestampedWindowStore<K, V> build() { WindowStore<Bytes, byte[]> store = storeSupplier.get(); - if (!(store instanceof TimestampedBytesStore) && store.persistent()) { - store = new WindowToTimestampedWindowByteStoreAdapter(store); + if (!(store instanceof TimestampedBytesStore)) { + if (store.persistent()) { + store = new WindowToTimestampedWindowByteStoreAdapter(store); + } else { + store = new InMemoryTimestampedWindowStoreMarker(store); + } } return new MeteredTimestampedWindowStore<>( maybeWrapCaching(maybeWrapLogging(store)), @@ -76,4 +85,96 @@ public class TimestampedWindowStoreBuilder<K, V> public long retentionPeriod() { return storeSupplier.retentionPeriod(); } + + + private final static class InMemoryTimestampedWindowStoreMarker + implements WindowStore<Bytes, byte[]>, TimestampedBytesStore { + + private final WindowStore<Bytes, byte[]> wrapped; + + private InMemoryTimestampedWindowStoreMarker(final WindowStore<Bytes, byte[]> wrapped) { + if (wrapped.persistent()) { + throw new IllegalArgumentException("Provided store must not be a persistent store, but it is."); + } + this.wrapped = wrapped; + } + + @Override + public void init(final ProcessorContext context, + final StateStore root) { + wrapped.init(context, root); + } + + @Override + public void put(final Bytes key, + final byte[] value) { + wrapped.put(key, value); + } + + @Override + public void put(final Bytes key, + final byte[] value, + final long windowStartTimestamp) { + wrapped.put(key, value, windowStartTimestamp); + } + + @Override + public byte[] fetch(final Bytes key, + final long time) { + return wrapped.fetch(key, time); + } + + @SuppressWarnings("deprecation") + @Override + public WindowStoreIterator<byte[]> fetch(final Bytes key, + final long timeFrom, + final long timeTo) { + return wrapped.fetch(key, timeFrom, timeTo); + } + + @SuppressWarnings("deprecation") + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, + final Bytes to, + final long timeFrom, + final long timeTo) { + return wrapped.fetch(from, to, timeFrom, timeTo); + } + + @SuppressWarnings("deprecation") + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, + final long timeTo) { + return wrapped.fetchAll(timeFrom, timeTo); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> all() { + return wrapped.all(); + } + + @Override + public void flush() { + wrapped.flush(); + } + + @Override + public void close() { + wrapped.close(); + } + @Override + public boolean isOpen() { + return wrapped.isOpen(); + } + + @Override + public String name() { + return wrapped.name(); + } + + @Override + public boolean persistent() { + return false; + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java index afb2cc1..600f3db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java @@ -19,8 +19,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.GenericInMemoryKeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.test.GenericInMemoryTimestampedKeyValueStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.junit.Test; @@ -45,18 +46,19 @@ public class KTableReduceTest { this::differenceNotNullArgs ).get(); - final KeyValueStore<String, Set<String>> myStore = new GenericInMemoryKeyValueStore<>("myStore"); + final TimestampedKeyValueStore<String, Set<String>> myStore = + new GenericInMemoryTimestampedKeyValueStore<>("myStore"); context.register(myStore, null); reduceProcessor.init(context); context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, singleton("myStore"))); reduceProcessor.process("A", new Change<>(singleton("a"), null)); - assertEquals(singleton("a"), myStore.get("A")); + assertEquals(ValueAndTimestamp.make(singleton("a"), -1L), myStore.get("A")); reduceProcessor.process("A", new Change<>(singleton("b"), singleton("a"))); - assertEquals(singleton("b"), myStore.get("A")); + assertEquals(ValueAndTimestamp.make(singleton("b"), -1L), myStore.get("A")); reduceProcessor.process("A", new Change<>(null, singleton("b"))); - assertEquals(emptySet(), myStore.get("A")); + assertEquals(ValueAndTimestamp.make(emptySet(), -1L), myStore.get("A")); } private Set<String> differenceNotNullArgs(final Set<String> left, final Set<String> right) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java index ed6b649..a9bf7f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java @@ -37,6 +37,8 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; @@ -90,7 +92,7 @@ public class KTableTransformValuesTest { @Mock(MockType.NICE) private KTableValueGetter<String, String> parentGetter; @Mock(MockType.NICE) - private KeyValueStore<String, String> stateStore; + private TimestampedKeyValueStore<String, String> stateStore; @Mock(MockType.NICE) private ValueTransformerWithKeySupplier<String, String, String> mockSupplier; @Mock(MockType.NICE) @@ -220,7 +222,7 @@ public class KTableTransformValuesTest { new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), QUERYABLE_NAME); expect(context.getStateStore(QUERYABLE_NAME)).andReturn(stateStore); - expect(stateStore.get("Key")).andReturn("something"); + expect(stateStore.get("Key")).andReturn(ValueAndTimestamp.make("something", 0L)); replay(context, stateStore); final KTableValueGetter<String, String> getter = transformValues.view().get(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java new file mode 100644 index 0000000..38ef5c6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class TimestampedCacheFlushListenerTest { + + @Test + public void shouldForwardValueTimestampIfNewValueExists() { + final InternalProcessorContext context = mock(InternalProcessorContext.class); + expect(context.currentNode()).andReturn(null).anyTimes(); + context.setCurrentNode(null); + context.setCurrentNode(null); + context.forward( + "key", + new Change<>("newValue", "oldValue"), + To.all().withTimestamp(42L)); + expectLastCall(); + replay(context); + + new TimestampedCacheFlushListener<>(context).apply( + "key", + ValueAndTimestamp.make("newValue", 42L), + ValueAndTimestamp.make("oldValue", 21L), + 73L); + + verify(context); + } + + @Test + public void shouldForwardParameterTimestampIfNewValueIsNull() { + final InternalProcessorContext context = mock(InternalProcessorContext.class); + expect(context.currentNode()).andReturn(null).anyTimes(); + context.setCurrentNode(null); + context.setCurrentNode(null); + context.forward( + "key", + new Change<>(null, "oldValue"), + To.all().withTimestamp(73L)); + expectLastCall(); + replay(context); + + new TimestampedCacheFlushListener<>(context).apply( + "key", + null, + ValueAndTimestamp.make("oldValue", 21L), + 73L); + + verify(context); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java similarity index 66% copy from streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java copy to streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java index 68c6eaa..068cb6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.junit.Test; @@ -27,7 +28,7 @@ import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; -public class TupleForwarderTest { +public class TimestampedTupleForwarderTest { @Test public void shouldSetFlushListenerOnWrappedStateStore() { @@ -36,29 +37,38 @@ public class TupleForwarderTest { } private void setFlushListener(final boolean sendOldValues) { - final WrappedStateStore<StateStore, Object, Object> store = mock(WrappedStateStore.class); - final ForwardingCacheFlushListener<Object, Object> flushListener = mock(ForwardingCacheFlushListener.class); + final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class); + final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class); expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false); replay(store); - new TupleForwarder<>(store, null, flushListener, sendOldValues); + new TimestampedTupleForwarder<>(store, null, flushListener, sendOldValues); verify(store); } @Test public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() { + shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false); + shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true); + } + + private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) { final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); final ProcessorContext context = mock(ProcessorContext.class); - expect(store.setFlushListener(null, false)).andReturn(false); - context.forward("key", new Change<>("value", "oldValue")); + expect(store.setFlushListener(null, sendOldValues)).andReturn(false); + if (sendOldValues) { + context.forward("key", new Change<>("newValue", "oldValue")); + } else { + context.forward("key", new Change<>("newValue", null)); + } expectLastCall(); replay(store, context); - new TupleForwarder<>(store, context, null, false) - .maybeForward("key", "value", "oldValue"); + new TimestampedTupleForwarder<>(store, context, null, sendOldValues) + .maybeForward("key", "newValue", "oldValue"); verify(store, context); } @@ -71,10 +81,9 @@ public class TupleForwarderTest { expect(store.setFlushListener(null, false)).andReturn(true); replay(store, context); - new TupleForwarder<>(store, context, null, false) - .maybeForward("key", "value", "oldValue"); + new TimestampedTupleForwarder<>(store, context, null, false) + .maybeForward("key", "newValue", "oldValue"); verify(store, context); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java index 68c6eaa..f62e826 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java @@ -49,16 +49,25 @@ public class TupleForwarderTest { @Test public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() { + shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false); + shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true); + } + + private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) { final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); final ProcessorContext context = mock(ProcessorContext.class); - expect(store.setFlushListener(null, false)).andReturn(false); - context.forward("key", new Change<>("value", "oldValue")); + expect(store.setFlushListener(null, sendOldValues)).andReturn(false); + if (sendOldValues) { + context.forward("key", new Change<>("newValue", "oldValue")); + } else { + context.forward("key", new Change<>("newValue", null)); + } expectLastCall(); replay(store, context); - new TupleForwarder<>(store, context, null, false) - .maybeForward("key", "value", "oldValue"); + new TupleForwarder<>(store, context, null, sendOldValues) + .maybeForward("key", "newValue", "oldValue"); verify(store, context); } @@ -72,7 +81,7 @@ public class TupleForwarderTest { replay(store, context); new TupleForwarder<>(store, context, null, false) - .maybeForward("key", "value", "oldValue"); + .maybeForward("key", "newValue", "oldValue"); verify(store, context); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index c0e0de3..a5db924 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.InternalNameProvider; import org.apache.kafka.streams.kstream.internals.KTableSource; -import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; +import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueStore; @@ -87,7 +87,7 @@ public class GlobalStreamThreadTest { ); builder.addGlobalStore( - new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(), + new TimestampedKeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(), "sourceName", null, null, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 8c8811d..0e50120 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -51,13 +51,15 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.WindowKeySchema; import org.apache.kafka.test.MockKeyValueStore; +import org.apache.kafka.test.MockKeyValueStoreBuilder; import org.apache.kafka.test.MockRestoreConsumer; import org.apache.kafka.test.MockStateRestoreListener; -import org.apache.kafka.test.MockKeyValueStoreBuilder; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -352,9 +354,9 @@ public class StandbyTaskTest { assertEquals( asList( - new KeyValue<>(new Windowed<>(1, new TimeWindow(0, 60_000)), 100L), - new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L), - new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L) + new KeyValue<>(new Windowed<>(1, new TimeWindow(0, 60_000)), ValueAndTimestamp.make(100L, 60_000L)), + new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), ValueAndTimestamp.make(100L, 120_000L)), + new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), ValueAndTimestamp.make(100L, 180_000L)) ), getWindowedStoreContents(storeName, task) ); @@ -368,9 +370,9 @@ public class StandbyTaskTest { // the first record's window should have expired. assertEquals( asList( - new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L), - new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L), - new KeyValue<>(new Windowed<>(4, new TimeWindow(180_000, 240_000)), 100L) + new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), ValueAndTimestamp.make(100L, 120_000L)), + new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), ValueAndTimestamp.make(100L, 180_000L)), + new KeyValue<>(new Windowed<>(4, new TimeWindow(180_000, 240_000)), ValueAndTimestamp.make(100L, 240_000L)) ), getWindowedStoreContents(storeName, task) ); @@ -388,7 +390,7 @@ public class StandbyTaskTest { changelogName, 1, offset, - start, + end, TimestampType.CREATE_TIME, 0L, 0, @@ -449,17 +451,17 @@ public class StandbyTaskTest { } @SuppressWarnings("unchecked") - private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName, - final StandbyTask task) { + private List<KeyValue<Windowed<Integer>, ValueAndTimestamp<Long>>> getWindowedStoreContents(final String storeName, + final StandbyTask task) { final StandbyContextImpl context = (StandbyContextImpl) task.context(); - final List<KeyValue<Windowed<Integer>, Long>> result = new ArrayList<>(); + final List<KeyValue<Windowed<Integer>, ValueAndTimestamp<Long>>> result = new ArrayList<>(); - try (final KeyValueIterator<Windowed<byte[]>, Long> iterator = - ((WindowStore) context.getStateMgr().getStore(storeName)).all()) { + try (final KeyValueIterator<Windowed<byte[]>, ValueAndTimestamp<Long>> iterator = + ((TimestampedWindowStore) context.getStateMgr().getStore(storeName)).all()) { while (iterator.hasNext()) { - final KeyValue<Windowed<byte[]>, Long> next = iterator.next(); + final KeyValue<Windowed<byte[]>, ValueAndTimestamp<Long>> next = iterator.next(); final Integer deserializedKey = new IntegerDeserializer().deserialize(null, next.key.key()); result.add(new KeyValue<>(new Windowed<>(deserializedKey, next.key.window()), next.value)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java similarity index 66% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java index 30080c3..f963786 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java @@ -20,16 +20,18 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.InternalNameProvider; -import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.internals.CachedStateStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.internals.CachingKeyValueStore; import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore; +import org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; -import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; +import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -44,7 +46,7 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; @RunWith(EasyMockRunner.class) -public class KeyValueStoreMaterializerTest { +public class TimestampedKeyValueStoreMaterializerTest { private final String storePrefix = "prefix"; @Mock(type = MockType.NICE) @@ -55,14 +57,14 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("store"), nameProvider, storePrefix); - final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); - final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); - final KeyValueStore<String, String> store = builder.build(); + final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized); + final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize(); + final TimestampedKeyValueStore<String, String> store = builder.build(); final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); final StateStore logging = caching.wrapped(); - assertThat(store, instanceOf(MeteredKeyValueStore.class)); - assertThat(caching, instanceOf(CachedStateStore.class)); - assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); + assertThat(store, instanceOf(MeteredTimestampedKeyValueStore.class)); + assertThat(caching, instanceOf(CachingKeyValueStore.class)); + assertThat(logging, instanceOf(ChangeLoggingTimestampedKeyValueBytesStore.class)); } @Test @@ -70,9 +72,9 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled(), nameProvider, storePrefix ); - final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); - final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); - final KeyValueStore<String, String> store = builder.build(); + final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized); + final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize(); + final TimestampedKeyValueStore<String, String> store = builder.build(); final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); } @@ -82,11 +84,11 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withLoggingDisabled(), nameProvider, storePrefix ); - final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); - final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); - final KeyValueStore<String, String> store = builder.build(); + final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized); + final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize(); + final TimestampedKeyValueStore<String, String> store = builder.build(); final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); - assertThat(caching, instanceOf(CachedStateStore.class)); + assertThat(caching, instanceOf(CachingKeyValueStore.class)); assertThat(caching.wrapped(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class))); } @@ -95,11 +97,11 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, storePrefix ); - final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); - final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); - final KeyValueStore<String, String> store = builder.build(); + final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized); + final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize(); + final TimestampedKeyValueStore<String, String> store = builder.build(); final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - assertThat(wrapped, not(instanceOf(CachedStateStore.class))); + assertThat(wrapped, not(instanceOf(CachingKeyValueStore.class))); assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class))); } @@ -113,9 +115,9 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as(supplier), nameProvider, storePrefix); - final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized); - final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize(); - final KeyValueStore<String, Integer> built = builder.build(); + final TimestampedKeyValueStoreMaterializer<String, Integer> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized); + final StoreBuilder<TimestampedKeyValueStore<String, Integer>> builder = materializer.materialize(); + final TimestampedKeyValueStore<String, Integer> built = builder.build(); assertThat(store.name(), CoreMatchers.equalTo(built.name())); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java index e520df4..8c83271 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -218,6 +218,17 @@ public class StoresTest { } @Test + public void shouldBuildTimestampedKeyValueStoreThatWrapsInMemoryKeyValueStore() { + final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder( + Stores.inMemoryKeyValueStore("name"), + Serdes.String(), + Serdes.String() + ).withLoggingDisabled().withCachingDisabled().build(); + assertThat(store, not(nullValue())); + assertThat(((WrappedStateStore) store).wrapped(), instanceOf(TimestampedBytesStore.class)); + } + + @Test public void shouldBuildWindowStore() { final WindowStore<String, String> store = Stores.windowStoreBuilder( Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true), @@ -238,6 +249,27 @@ public class StoresTest { } @Test + public void shouldBuildTimestampedWindowStoreThatWrapsWindowStore() { + final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder( + Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true), + Serdes.String(), + Serdes.String() + ).build(); + assertThat(store, not(nullValue())); + } + + @Test + public void shouldBuildTimestampedWindowStoreThatWrapsInMemroyWindowStore() { + final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder( + Stores.inMemoryWindowStore("store", ofMillis(3L), ofMillis(3L), true), + Serdes.String(), + Serdes.String() + ).withLoggingDisabled().withCachingDisabled().build(); + assertThat(store, not(nullValue())); + assertThat(((WrappedStateStore) store).wrapped(), instanceOf(TimestampedBytesStore.class)); + } + + @Test public void shouldBuildSessionStore() { final SessionStore<String, String> store = Stores.sessionStoreBuilder( Stores.persistentSessionStore("name", ofMillis(10)), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java index 7b0eb6d..1ba3d37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java @@ -18,11 +18,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.easymock.EasyMockRunner; import org.easymock.Mock; @@ -46,14 +44,16 @@ public class TimestampedKeyValueStoreBuilderTest { @Mock(type = MockType.NICE) private KeyValueBytesStoreSupplier supplier; @Mock(type = MockType.NICE) - private KeyValueStore<Bytes, byte[]> inner; + private RocksDBTimestampedStore inner; private TimestampedKeyValueStoreBuilder<String, String> builder; @Before public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); - replay(supplier); + expect(inner.persistent()).andReturn(true).anyTimes(); + replay(supplier, inner); + builder = new TimestampedKeyValueStoreBuilder<>( supplier, Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java index 7be31ea..e6d3da5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -18,12 +18,10 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; -import org.apache.kafka.streams.state.WindowStore; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; @@ -46,14 +44,15 @@ public class TimestampedWindowStoreBuilderTest { @Mock(type = MockType.NICE) private WindowBytesStoreSupplier supplier; @Mock(type = MockType.NICE) - private WindowStore<Bytes, byte[]> inner; + private RocksDBTimestampedWindowStore inner; private TimestampedWindowStoreBuilder<String, String> builder; @Before public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); - replay(supplier); + expect(inner.persistent()).andReturn(true).anyTimes(); + replay(supplier, inner); builder = new TimestampedWindowStoreBuilder<>( supplier, diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java new file mode 100644 index 0000000..67a67c9 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.CacheFlushListener; +import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator; +import org.apache.kafka.streams.state.internals.WrappedStateStore; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * This class is a generic version of the in-memory key-value store that is useful for testing when you + * need a basic KeyValueStore for arbitrary types and don't have/want to write a serde + */ +public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V> + extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>> + implements TimestampedKeyValueStore<K, V> { + + private final String name; + private final NavigableMap<K, ValueAndTimestamp<V>> map; + private volatile boolean open = false; + + public GenericInMemoryTimestampedKeyValueStore(final String name) { + // it's not really a `WrappedStateStore` so we pass `null` + // however, we need to implement `WrappedStateStore` to make the store usable + super(null); + this.name = name; + + this.map = new TreeMap<>(); + } + + @Override + public String name() { + return this.name; + } + + @Override + @SuppressWarnings("unchecked") + /* This is a "dummy" store used for testing; + it does not support restoring from changelog since we allow it to be serde-ignorant */ + public void init(final ProcessorContext context, final StateStore root) { + if (root != null) { + context.register(root, null); + } + + this.open = true; + } + + @Override + public boolean setFlushListener(final CacheFlushListener<K, ValueAndTimestamp<V>> listener, + final boolean sendOldValues) { + return false; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return this.open; + } + + @Override + public synchronized ValueAndTimestamp<V> get(final K key) { + return this.map.get(key); + } + + @Override + public synchronized void put(final K key, + final ValueAndTimestamp<V> value) { + if (value == null) { + this.map.remove(key); + } else { + this.map.put(key, value); + } + } + + @Override + public synchronized ValueAndTimestamp<V> putIfAbsent(final K key, + final ValueAndTimestamp<V> value) { + final ValueAndTimestamp<V> originalValue = get(key); + if (originalValue == null) { + put(key, value); + } + return originalValue; + } + + @Override + public synchronized void putAll(final List<KeyValue<K, ValueAndTimestamp<V>>> entries) { + for (final KeyValue<K, ValueAndTimestamp<V>> entry : entries) { + put(entry.key, entry.value); + } + } + + @Override + public synchronized ValueAndTimestamp<V> delete(final K key) { + return this.map.remove(key); + } + + @Override + public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> range(final K from, + final K to) { + return new DelegatingPeekingKeyValueIterator<>( + name, + new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + } + + @Override + public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> all() { + final TreeMap<K, ValueAndTimestamp<V>> copy = new TreeMap<>(this.map); + return new DelegatingPeekingKeyValueIterator<>(name, new GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator())); + } + + @Override + public long approximateNumEntries() { + return this.map.size(); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + this.map.clear(); + this.open = false; + } + + private static class GenericInMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, ValueAndTimestamp<V>> { + private final Iterator<Entry<K, ValueAndTimestamp<V>>> iter; + + private GenericInMemoryKeyValueIterator(final Iterator<Map.Entry<K, ValueAndTimestamp<V>>> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue<K, ValueAndTimestamp<V>> next() { + final Map.Entry<K, ValueAndTimestamp<V>> entry = iter.next(); + return new KeyValue<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + } +} \ No newline at end of file