This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 517b5d2b09e KAFKA-14491: [12/N] Relax requirement that KTable stores
must be TimestampedKVStores (#13264)
517b5d2b09e is described below
commit 517b5d2b09ec22cf1734ba1e2d8be9ece5fb0365
Author: Victoria Xia <[email protected]>
AuthorDate: Thu Mar 2 14:14:30 2023 -0800
KAFKA-14491: [12/N] Relax requirement that KTable stores must be
TimestampedKVStores (#13264)
As part of introducing versioned key-value stores in KIP-889, we want to
lift the existing DSL restriction that KTable stores are always
TimestampedKeyValueStores to allow for KTable stores which are
VersionedKeyValueStores instead. This PR lifts this restriction by replacing
raw usages of TimestampedKeyValueStore with a new KeyValueStoreWrapper which
supports either TimestampedKeyValueStore or VersionedKeyValueStore.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kstream/internals/KStreamAggregate.java | 14 +-
.../streams/kstream/internals/KStreamReduce.java | 14 +-
.../streams/kstream/internals/KTableAggregate.java | 10 +-
.../streams/kstream/internals/KTableFilter.java | 10 +-
.../streams/kstream/internals/KTableImpl.java | 10 +-
.../kstream/internals/KTableKTableJoinMerger.java | 11 +-
.../streams/kstream/internals/KTableMapValues.java | 10 +-
.../KTableMaterializedValueGetterSupplier.java | 6 +-
.../kstream/internals/KTablePassThrough.java | 6 +-
.../streams/kstream/internals/KTableReduce.java | 10 +-
.../streams/kstream/internals/KTableSource.java | 10 +-
.../internals/KTableSourceValueGetterSupplier.java | 6 +-
.../kstream/internals/KTableTransformValues.java | 10 +-
.../internals/graph/KTableKTableJoinNode.java | 9 +-
.../kstream/internals/graph/StreamToTableNode.java | 3 +-
.../internals/graph/TableProcessorNode.java | 8 +-
.../kstream/internals/graph/TableSourceNode.java | 5 +-
.../state/internals/KeyValueStoreWrapper.java | 145 +++++++++
.../state/internals/KeyValueStoreWrapperTest.java | 356 +++++++++++++++++++++
19 files changed, 573 insertions(+), 80 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 109160de507..27a3d488918 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -25,8 +25,8 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +63,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
private class KStreamAggregateProcessor extends ContextualProcessor<KIn,
VIn, KIn, Change<VAgg>> {
- private TimestampedKeyValueStore<KIn, VAgg> store;
+ private KeyValueStoreWrapper<KIn, VAgg> store;
private Sensor droppedRecordsSensor;
private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder;
@@ -74,9 +74,9 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
Thread.currentThread().getName(),
context.taskId().toString(),
(StreamsMetricsImpl) context.metrics());
- store = context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -118,7 +118,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
- store.put(record.key(), ValueAndTimestamp.make(newAgg,
newTimestamp));
+ store.put(record.key(), newAgg, newTimestamp);
tupleForwarder.maybeForward(
record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg :
null))
.withTimestamp(newTimestamp));
@@ -141,11 +141,11 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
}
private class KStreamAggregateValueGetter implements
KTableValueGetter<KIn, VAgg> {
- private TimestampedKeyValueStore<KIn, VAgg> store;
+ private KeyValueStoreWrapper<KIn, VAgg> store;
@Override
public void init(final ProcessorContext<?, ?> context) {
- store = context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index e936855aac9..b801d2b60ea 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -24,8 +24,8 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
private class KStreamReduceProcessor extends ContextualProcessor<K, V, K,
Change<V>> {
- private TimestampedKeyValueStore<K, V> store;
+ private KeyValueStoreWrapper<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
private Sensor droppedRecordsSensor;
@@ -70,9 +70,9 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
context.taskId().toString(),
(StreamsMetricsImpl) context.metrics()
);
- store = context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -112,7 +112,7 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
newTimestamp = Math.max(record.timestamp(),
oldAggAndTimestamp.timestamp());
}
- store.put(record.key(), ValueAndTimestamp.make(newAgg,
newTimestamp));
+ store.put(record.key(), newAgg, newTimestamp);
tupleForwarder.maybeForward(
record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg :
null))
.withTimestamp(newTimestamp));
@@ -136,11 +136,11 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
- private TimestampedKeyValueStore<K, V> store;
+ private KeyValueStoreWrapper<K, V> store;
@Override
public void init(final ProcessorContext<?, ?> context) {
- store = context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index f10af467404..75442555cc2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -22,8 +22,8 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -60,15 +60,15 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
}
private class KTableAggregateProcessor implements Processor<KIn,
Change<VIn>, KIn, Change<VAgg>> {
- private TimestampedKeyValueStore<KIn, VAgg> store;
+ private KeyValueStoreWrapper<KIn, VAgg> store;
private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<KIn, Change<VAgg>> context) {
- store = (TimestampedKeyValueStore<KIn, VAgg>)
context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -116,7 +116,7 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
}
// update the store with the new value
- store.put(record.key(), ValueAndTimestamp.make(newAgg,
newTimestamp));
+ store.put(record.key(), newAgg, newTimestamp);
tupleForwarder.maybeForward(
record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg :
null))
.withTimestamp(newTimestamp));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 99f50be1af2..6287aa40c55 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -20,8 +20,8 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -88,16 +88,16 @@ class KTableFilter<KIn, VIn> implements
KTableProcessorSupplier<KIn, VIn, KIn, V
private class KTableFilterProcessor implements Processor<KIn, Change<VIn>,
KIn, Change<VIn>> {
private ProcessorContext<KIn, Change<VIn>> context;
- private TimestampedKeyValueStore<KIn, VIn> store;
+ private KeyValueStoreWrapper<KIn, VIn> store;
private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
@Override
public void init(final ProcessorContext<KIn, Change<VIn>> context) {
this.context = context;
if (queryableName != null) {
- store = context.getStateStore(queryableName);
+ store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -117,7 +117,7 @@ class KTableFilter<KIn, VIn> implements
KTableProcessorSupplier<KIn, VIn, KIn, V
}
if (queryableName != null) {
- store.put(key, ValueAndTimestamp.make(newValue,
record.timestamp()));
+ store.put(key, newValue, record.timestamp());
tupleForwarder.maybeForward(record.withValue(new
Change<>(newValue, oldValue)));
} else {
context.forward(record.withValue(new Change<>(newValue,
oldValue)));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index e34ac2f5841..c2786257a74 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -175,7 +175,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final Serde<K> keySerde;
final Serde<V> valueSerde;
final String queryableStoreName;
- final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
+ final StoreBuilder<?> storeBuilder;
if (materializedInternal != null) {
// we actually do not need to generate store names at all since if
it is not specified, we will not
@@ -290,7 +290,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
- final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+ final StoreBuilder<?> storeBuilder;
if (materializedInternal != null) {
// we actually do not need to generate store names at all since if
it is not specified, we will not
@@ -445,7 +445,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
- final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+ final StoreBuilder<?> storeBuilder;
if (materializedInternal != null) {
// don't inherit parent value serde, since this operation may
change the value type, more specifically:
@@ -750,7 +750,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
- final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+ final StoreBuilder<?> storeBuilder;
if (materializedInternal != null) {
if (materializedInternal.keySerde() == null) {
@@ -1270,7 +1270,7 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
materializedInternal.queryableStoreName()
);
- final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
+ final StoreBuilder<?> resultStore =
new
TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 72dbe40b7f1..2ed1dc42a97 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -20,12 +20,11 @@ import
org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
@@ -98,7 +97,7 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
}
private class KTableKTableJoinMergeProcessor extends
ContextualProcessor<K, Change<V>, K, Change<V>> {
- private TimestampedKeyValueStore<K, V> store;
+ private KeyValueStoreWrapper<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@@ -106,9 +105,9 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
public void init(final ProcessorContext<K, Change<V>> context) {
super.init(context);
if (queryableName != null) {
- store = (TimestampedKeyValueStore<K, V>)
context.getStateStore(queryableName);
+ store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -118,7 +117,7 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
@Override
public void process(final Record<K, Change<V>> record) {
if (queryableName != null) {
- store.put(record.key(),
ValueAndTimestamp.make(record.value().newValue, record.timestamp()));
+ store.put(record.key(), record.value().newValue,
record.timestamp());
tupleForwarder.maybeForward(record);
} else {
if (sendOldValues) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index acea6c0cf2c..f0897dcdb05 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,8 +20,8 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -106,16 +106,16 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
private class KTableMapValuesProcessor implements Processor<KIn,
Change<VIn>, KIn, Change<VOut>> {
private ProcessorContext<KIn, Change<VOut>> context;
- private TimestampedKeyValueStore<KIn, VOut> store;
+ private KeyValueStoreWrapper<KIn, VOut> store;
private TimestampedTupleForwarder<KIn, VOut> tupleForwarder;
@Override
public void init(final ProcessorContext<KIn, Change<VOut>> context) {
this.context = context;
if (queryableName != null) {
- store = context.getStateStore(queryableName);
+ store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -128,7 +128,7 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
final VOut oldValue = computeOldValue(record.key(),
record.value());
if (queryableName != null) {
- store.put(record.key(), ValueAndTimestamp.make(newValue,
record.timestamp()));
+ store.put(record.key(), newValue, record.timestamp());
tupleForwarder.maybeForward(record.withValue(new
Change<>(newValue, oldValue)));
} else {
context.forward(record.withValue(new Change<>(newValue,
oldValue)));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index e7570366f24..ba7e65081d5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
public class KTableMaterializedValueGetterSupplier<K, V> implements
KTableValueGetterSupplier<K, V> {
private final String storeName;
@@ -37,11 +37,11 @@ public class KTableMaterializedValueGetterSupplier<K, V>
implements KTableValueG
}
private class KTableMaterializedValueGetter implements
KTableValueGetter<K, V> {
- private TimestampedKeyValueStore<K, V> store;
+ private KeyValueStoreWrapper<K, V> store;
@Override
public void init(final ProcessorContext<?, ?> context) {
- store = context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
index a28b973d75f..91fe0e4a277 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
@@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Collection;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
public class KTablePassThrough<KIn, VIn> implements
KTableProcessorSupplier<KIn, VIn, KIn, VIn> {
private final Collection<KStreamAggProcessorSupplier> parents;
@@ -79,11 +79,11 @@ public class KTablePassThrough<KIn, VIn> implements
KTableProcessorSupplier<KIn,
}
private class KTablePassThroughValueGetter implements
KTableValueGetter<KIn, VIn> {
- private TimestampedKeyValueStore<KIn, VIn> store;
+ private KeyValueStoreWrapper<KIn, VIn> store;
@Override
public void init(final ProcessorContext<?, ?> context) {
- store = context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 08d04494957..ef40638dd28 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -54,15 +54,15 @@ public class KTableReduce<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
private class KTableReduceProcessor implements Processor<K, Change<V>, K,
Change<V>> {
- private TimestampedKeyValueStore<K, V> store;
+ private KeyValueStoreWrapper<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<K, Change<V>> context) {
- store = (TimestampedKeyValueStore<K, V>)
context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -106,7 +106,7 @@ public class KTableReduce<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
}
// update the store with the new value
- store.put(record.key(), ValueAndTimestamp.make(newAgg,
newTimestamp));
+ store.put(record.key(), newAgg, newTimestamp);
tupleForwarder.maybeForward(
record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg :
null))
.withTimestamp(newTimestamp));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 6de8ede316b..7856c91d839 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -26,8 +26,8 @@ import
org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +76,7 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
private class KTableSourceProcessor implements Processor<KIn, VIn, KIn,
Change<VIn>> {
private ProcessorContext<KIn, Change<VIn>> context;
- private TimestampedKeyValueStore<KIn, VIn> store;
+ private KeyValueStoreWrapper<KIn, VIn> store;
private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
private Sensor droppedRecordsSensor;
@@ -88,9 +88,9 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
if (queryableName != null) {
- store = context.getStateStore(queryableName);
+ store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -146,7 +146,7 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
} else {
oldValue = null;
}
- store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()));
+ store.put(record.key(), record.value(), record.timestamp());
tupleForwarder.maybeForward(record.withValue(new
Change<>(record.value(), oldValue)));
} else {
context.forward(record.withValue(new Change<>(record.value(),
null)));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 94701318fa2..1c340943354 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -18,8 +18,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
public class KTableSourceValueGetterSupplier<K, V> implements
KTableValueGetterSupplier<K, V> {
private final String storeName;
@@ -38,10 +38,10 @@ public class KTableSourceValueGetterSupplier<K, V>
implements KTableValueGetterS
}
private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
- private TimestampedKeyValueStore<K, V> store = null;
+ private KeyValueStoreWrapper<K, V> store;
public void init(final ProcessorContext<?, ?> context) {
- store = context.getStateStore(storeName);
+ store = new KeyValueStoreWrapper<>(context, storeName);
}
public ValueAndTimestamp<V> get(final K key) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 6aac1d4183b..5975571f0fa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -26,10 +26,10 @@ import org.apache.kafka.streams.processor.api.Record;
import
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Objects;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -90,7 +90,7 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
private class KTableTransformValuesProcessor extends
ContextualProcessor<K, Change<V>, K, Change<VOut>> {
private final ValueTransformerWithKey<? super K, ? super V, ? extends
VOut> valueTransformer;
- private TimestampedKeyValueStore<K, VOut> store;
+ private KeyValueStoreWrapper<K, VOut> store;
private TimestampedTupleForwarder<K, VOut> tupleForwarder;
private KTableTransformValuesProcessor(final ValueTransformerWithKey<?
super K, ? super V, ? extends VOut> valueTransformer) {
@@ -103,9 +103,9 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
final InternalProcessorContext<K, Change<VOut>>
internalProcessorContext = (InternalProcessorContext<K, Change<VOut>>) context;
valueTransformer.init(new
ForwardingDisabledProcessorContext(internalProcessorContext));
if (queryableName != null) {
- store = context.getStateStore(queryableName);
+ store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store,
+ store.getStore(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
@@ -121,7 +121,7 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
context().forward(record.withValue(new Change<>(newValue,
oldValue)));
} else {
final VOut oldValue = sendOldValues ?
getValueOrNull(store.get(record.key())) : null;
- store.put(record.key(), ValueAndTimestamp.make(newValue,
record.timestamp()));
+ store.put(record.key(), newValue, record.timestamp());
tupleForwarder.maybeForward(record.withValue(new
Change<>(newValue, oldValue)));
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 2772a668108..dec7b437b2c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -24,7 +24,6 @@ import
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Arrays;
@@ -37,7 +36,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
private final Serde<VR> valueSerde;
private final String[] joinThisStoreNames;
private final String[] joinOtherStoreNames;
- private final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+ private final StoreBuilder<?> storeBuilder;
KTableKTableJoinNode(final String nodeName,
final ProcessorParameters<K, Change<V1>, ?, ?>
joinThisProcessorParameters,
@@ -49,7 +48,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
final Serde<VR> valueSerde,
final String[] joinThisStoreNames,
final String[] joinOtherStoreNames,
- final StoreBuilder<TimestampedKeyValueStore<K, VR>>
storeBuilder) {
+ final StoreBuilder<?> storeBuilder) {
super(nodeName,
null,
@@ -148,7 +147,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
private String[] joinThisStoreNames;
private String[] joinOtherStoreNames;
private String queryableStoreName;
- private StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+ private StoreBuilder<?> storeBuilder;
private KTableKTableJoinNodeBuilder() {
}
@@ -203,7 +202,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
return this;
}
- public KTableKTableJoinNodeBuilder<K, V1, V2, VR>
withStoreBuilder(final StoreBuilder<TimestampedKeyValueStore<K, VR>>
storeBuilder) {
+ public KTableKTableJoinNodeBuilder<K, V1, V2, VR>
withStoreBuilder(final StoreBuilder<?> storeBuilder) {
this.storeBuilder = storeBuilder;
return this;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index 8c071dad229..6d79deb8105 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -24,7 +24,6 @@ import
org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMateri
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
/**
* Represents a KTable convert From KStream
@@ -53,7 +52,7 @@ public class StreamToTableNode<K, V> extends GraphNode {
@SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
- final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
+ final StoreBuilder<?> storeBuilder =
new
TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V,
KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
final String processorName = processorParameters.processorName();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index c522b5a9f3f..5b25fb682f1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Arrays;
import java.util.Objects;
@@ -28,19 +27,18 @@ import java.util.Objects;
public class TableProcessorNode<K, V> extends GraphNode {
private final ProcessorParameters<K, V, ?, ?> processorParameters;
- private final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
+ private final StoreBuilder<?> storeBuilder;
private final String[] storeNames;
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?>
processorParameters,
- final StoreBuilder<TimestampedKeyValueStore<K,
V>> storeBuilder) {
+ final StoreBuilder<?> storeBuilder) {
this(nodeName, processorParameters, storeBuilder, null);
}
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?>
processorParameters,
- // TODO KIP-300: we are enforcing this as a
keyvalue store, but it should go beyond any type of stores
- final StoreBuilder<TimestampedKeyValueStore<K,
V>> storeBuilder,
+ final StoreBuilder<?> storeBuilder,
final String[] storeNames) {
super(nodeName);
this.processorParameters = processorParameters;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 52413b9b61b..e218263e9b4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -27,7 +27,6 @@ import
org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Collections;
@@ -94,9 +93,7 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K,
V> {
throw new IllegalStateException("A table source node must have a
single topic as input");
}
- // TODO: we assume source KTables can only be timestamped-key-value
stores for now.
- // should be expanded for other types of stores as well.
- final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
+ final StoreBuilder<?> storeBuilder =
new
TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V,
KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
if (isGlobalKTable) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
new file mode 100644
index 00000000000..9c2920570aa
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link
VersionedKeyValueStore}.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class KeyValueStoreWrapper<K, V> implements StateStore {
+
+ private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private VersionedKeyValueStore<K, V> versionedStore = null;
+
+ // same as either timestampedStore or versionedStore above. kept merely as
a convenience
+ // to simplify implementation for methods which do not depend on store
type.
+ private StateStore store = null;
+
+ public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ try {
+ // first try timestamped store
+ timestampedStore = context.getStateStore(storeName);
+ store = timestampedStore;
+ return;
+ } catch (final ClassCastException e) {
+ // ignore since could be versioned store instead
+ }
+
+ try {
+ // next try versioned store
+ versionedStore = context.getStateStore(storeName);
+ store = versionedStore;
+ } catch (final ClassCastException e) {
+ store = context.getStateStore(storeName);
+ final String storeType = store == null ? "null" :
store.getClass().getName();
+ throw new InvalidStateStoreException("KTable source state store
must implement either "
+ + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: "
+ storeType);
+ }
+ }
+
+ public ValueAndTimestamp<V> get(final K key) {
+ if (timestampedStore != null) {
+ return timestampedStore.get(key);
+ }
+ if (versionedStore != null) {
+ final VersionedRecord<V> versionedRecord = versionedStore.get(key);
+ return versionedRecord == null
+ ? null
+ : ValueAndTimestamp.make(versionedRecord.value(),
versionedRecord.timestamp());
+ }
+ throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
+ }
+
+ public void put(final K key, final V value, final long timestamp) {
+ if (timestampedStore != null) {
+ timestampedStore.put(key, ValueAndTimestamp.make(value,
timestamp));
+ return;
+ }
+ if (versionedStore != null) {
+ versionedStore.put(key, value, timestamp);
+ return;
+ }
+ throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
+ }
+
+ public StateStore getStore() {
+ return store;
+ }
+
+ @Override
+ public String name() {
+ return store.name();
+ }
+
+ @Deprecated
+ @Override
+ public void init(final org.apache.kafka.streams.processor.ProcessorContext
context, final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
+ public void flush() {
+ store.flush();
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return store.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return store.isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
+ return store.query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return store.getPosition();
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
new file mode 100644
index 00000000000..869d4858f75
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KeyValueStoreWrapperTest {
+
+ private static final String STORE_NAME = "kvStore";
+ private static final String KEY = "k";
+ private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP
+ = ValueAndTimestamp.make("v", 8L);
+
+ @Mock
+ private TimestampedKeyValueStore<String, String> timestampedStore;
+ @Mock
+ private VersionedKeyValueStore<String, String> versionedStore;
+ @Mock
+ private ProcessorContext context;
+ @Mock
+ private Query query;
+ @Mock
+ private PositionBound positionBound;
+ @Mock
+ private QueryConfig queryConfig;
+ @Mock
+ private QueryResult result;
+ @Mock
+ private Position position;
+
+ private KeyValueStoreWrapper<String, String> wrapper;
+
+ @Test
+ public void shouldThrowOnNonTimestampedOrVersionedStore() {
+
when(context.getStateStore(STORE_NAME)).thenReturn(mock(KeyValueStore.class));
+
+ assertThrows(InvalidStateStoreException.class, () -> new
KeyValueStoreWrapper<>(context, STORE_NAME));
+ }
+
+ @Test
+ public void shouldGetFromTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+ when(timestampedStore.get(KEY)).thenReturn(VALUE_AND_TIMESTAMP);
+
+ assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP));
+ }
+
+ @Test
+ public void shouldGetFromVersionedStore() {
+ givenWrapperWithVersionedStore();
+ when(versionedStore.get(KEY)).thenReturn(
+ new VersionedRecord<>(
+ VALUE_AND_TIMESTAMP.value(),
+ VALUE_AND_TIMESTAMP.timestamp())
+ );
+
+ assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP));
+ }
+
+ @Test
+ public void shouldGetNullFromTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+ when(timestampedStore.get(KEY)).thenReturn(null);
+
+ assertThat(wrapper.get(KEY), nullValue());
+ }
+
+ @Test
+ public void shouldGetNullFromVersionedStore() {
+ givenWrapperWithVersionedStore();
+ when(versionedStore.get(KEY)).thenReturn(null);
+
+ assertThat(wrapper.get(KEY), nullValue());
+ }
+
+ @Test
+ public void shouldPutToTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+
+ wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(),
VALUE_AND_TIMESTAMP.timestamp());
+
+ verify(timestampedStore).put(KEY, VALUE_AND_TIMESTAMP);
+ }
+
+ @Test
+ public void shouldPutToVersionedStore() {
+ givenWrapperWithVersionedStore();
+
+ wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(),
VALUE_AND_TIMESTAMP.timestamp());
+
+ verify(versionedStore).put(KEY, VALUE_AND_TIMESTAMP.value(),
VALUE_AND_TIMESTAMP.timestamp());
+ }
+
+ @Test
+ public void shouldPutNullToTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+
+ wrapper.put(KEY, null, VALUE_AND_TIMESTAMP.timestamp());
+
+ verify(timestampedStore).put(KEY, null);
+ }
+
+ @Test
+ public void shouldPutNullToVersionedStore() {
+ givenWrapperWithVersionedStore();
+
+ wrapper.put(KEY, null, VALUE_AND_TIMESTAMP.timestamp());
+
+ verify(versionedStore).put(KEY, null, VALUE_AND_TIMESTAMP.timestamp());
+ }
+
+ @Test
+ public void shouldGetTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+
+ assertThat(wrapper.getStore(), equalTo(timestampedStore));
+ }
+
+ @Test
+ public void shouldGetVersionedStore() {
+ givenWrapperWithVersionedStore();
+
+ assertThat(wrapper.getStore(), equalTo(versionedStore));
+ }
+
+ @Test
+ public void shouldGetNameForTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+ when(timestampedStore.name()).thenReturn(STORE_NAME);
+
+ assertThat(wrapper.name(), equalTo(STORE_NAME));
+ }
+
+ @Test
+ public void shouldGetNameForVersionedStore() {
+ givenWrapperWithVersionedStore();
+ when(versionedStore.name()).thenReturn(STORE_NAME);
+
+ assertThat(wrapper.name(), equalTo(STORE_NAME));
+ }
+
+ @Deprecated
+ @Test
+ public void shouldDeprecatedInitTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+ final org.apache.kafka.streams.processor.ProcessorContext mockContext
+ = mock(org.apache.kafka.streams.processor.ProcessorContext.class);
+
+ wrapper.init(mockContext, wrapper);
+
+ verify(timestampedStore).init(mockContext, wrapper);
+ }
+
+ @Deprecated
+ @Test
+ public void shouldDeprecatedInitVersionedStore() {
+ givenWrapperWithVersionedStore();
+ final org.apache.kafka.streams.processor.ProcessorContext mockContext
+ = mock(org.apache.kafka.streams.processor.ProcessorContext.class);
+
+ wrapper.init(mockContext, wrapper);
+
+ verify(versionedStore).init(mockContext, wrapper);
+ }
+
+ @Test
+ public void shouldInitTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+ final StateStoreContext mockContext = mock(StateStoreContext.class);
+
+ wrapper.init(mockContext, wrapper);
+
+ verify(timestampedStore).init(mockContext, wrapper);
+ }
+
+ @Test
+ public void shouldInitVersionedStore() {
+ givenWrapperWithVersionedStore();
+ final StateStoreContext mockContext = mock(StateStoreContext.class);
+
+ wrapper.init(mockContext, wrapper);
+
+ verify(versionedStore).init(mockContext, wrapper);
+ }
+
+ @Test
+ public void shouldFlushTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+
+ wrapper.flush();
+
+ verify(timestampedStore).flush();
+ }
+
+ @Test
+ public void shouldFlushVersionedStore() {
+ givenWrapperWithVersionedStore();
+
+ wrapper.flush();
+
+ verify(versionedStore).flush();
+ }
+
+ @Test
+ public void shouldCloseTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+
+ wrapper.close();
+
+ verify(timestampedStore).close();
+ }
+
+ @Test
+ public void shouldCloseVersionedStore() {
+ givenWrapperWithVersionedStore();
+
+ wrapper.close();
+
+ verify(versionedStore).close();
+ }
+
+ @Test
+ public void shouldReturnPersistentForTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+
+ // test "persistent = true"
+ when(timestampedStore.persistent()).thenReturn(true);
+ assertThat(wrapper.persistent(), equalTo(true));
+
+ // test "persistent = false"
+ when(timestampedStore.persistent()).thenReturn(false);
+ assertThat(wrapper.persistent(), equalTo(false));
+ }
+
+ @Test
+ public void shouldReturnPersistentForVersionedStore() {
+ givenWrapperWithVersionedStore();
+
+ // test "persistent = true"
+ when(versionedStore.persistent()).thenReturn(true);
+ assertThat(wrapper.persistent(), equalTo(true));
+
+ // test "persistent = false"
+ when(versionedStore.persistent()).thenReturn(false);
+ assertThat(wrapper.persistent(), equalTo(false));
+ }
+
+ @Test
+ public void shouldReturnIsOpenForTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+
+ // test "isOpen = true"
+ when(timestampedStore.isOpen()).thenReturn(true);
+ assertThat(wrapper.isOpen(), equalTo(true));
+
+ // test "isOpen = false"
+ when(timestampedStore.isOpen()).thenReturn(false);
+ assertThat(wrapper.isOpen(), equalTo(false));
+ }
+
+ @Test
+ public void shouldReturnIsOpenForVersionedStore() {
+ givenWrapperWithVersionedStore();
+
+ // test "isOpen = true"
+ when(versionedStore.isOpen()).thenReturn(true);
+ assertThat(wrapper.isOpen(), equalTo(true));
+
+ // test "isOpen = false"
+ when(versionedStore.isOpen()).thenReturn(false);
+ assertThat(wrapper.isOpen(), equalTo(false));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldQueryTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+ when(timestampedStore.query(query, positionBound,
queryConfig)).thenReturn(result);
+
+ assertThat(wrapper.query(query, positionBound, queryConfig),
equalTo(result));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldQueryVersionedStore() {
+ givenWrapperWithVersionedStore();
+ when(versionedStore.query(query, positionBound,
queryConfig)).thenReturn(result);
+
+ assertThat(wrapper.query(query, positionBound, queryConfig),
equalTo(result));
+ }
+
+ @Test
+ public void shouldGetPositionForTimestampedStore() {
+ givenWrapperWithTimestampedStore();
+ when(timestampedStore.getPosition()).thenReturn(position);
+
+ assertThat(wrapper.getPosition(), equalTo(position));
+ }
+
+ @Test
+ public void shouldGetPositionForVersionedStore() {
+ givenWrapperWithVersionedStore();
+ when(versionedStore.getPosition()).thenReturn(position);
+
+ assertThat(wrapper.getPosition(), equalTo(position));
+ }
+
+ private void givenWrapperWithTimestampedStore() {
+ when(context.getStateStore(STORE_NAME)).thenReturn(timestampedStore);
+ wrapper = new KeyValueStoreWrapper<>(context, STORE_NAME);
+ }
+
+ private void givenWrapperWithVersionedStore() {
+ when(context.getStateStore(STORE_NAME)).thenReturn(versionedStore);
+ wrapper = new KeyValueStoreWrapper<>(context, STORE_NAME);
+ }
+}
\ No newline at end of file