This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 517b5d2b09e KAFKA-14491: [12/N] Relax requirement that KTable stores 
must be TimestampedKVStores (#13264)
517b5d2b09e is described below

commit 517b5d2b09ec22cf1734ba1e2d8be9ece5fb0365
Author: Victoria Xia <victoria....@confluent.io>
AuthorDate: Thu Mar 2 14:14:30 2023 -0800

    KAFKA-14491: [12/N] Relax requirement that KTable stores must be 
TimestampedKVStores (#13264)
    
    As part of introducing versioned key-value stores in KIP-889, we want to 
lift the existing DSL restriction that KTable stores are always 
TimestampedKeyValueStores to allow for KTable stores which are 
VersionedKeyValueStores instead. This PR lifts this restriction by replacing 
raw usages of TimestampedKeyValueStore with a new KeyValueStoreWrapper which 
supports either TimestampedKeyValueStore or VersionedKeyValueStore.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../kstream/internals/KStreamAggregate.java        |  14 +-
 .../streams/kstream/internals/KStreamReduce.java   |  14 +-
 .../streams/kstream/internals/KTableAggregate.java |  10 +-
 .../streams/kstream/internals/KTableFilter.java    |  10 +-
 .../streams/kstream/internals/KTableImpl.java      |  10 +-
 .../kstream/internals/KTableKTableJoinMerger.java  |  11 +-
 .../streams/kstream/internals/KTableMapValues.java |  10 +-
 .../KTableMaterializedValueGetterSupplier.java     |   6 +-
 .../kstream/internals/KTablePassThrough.java       |   6 +-
 .../streams/kstream/internals/KTableReduce.java    |  10 +-
 .../streams/kstream/internals/KTableSource.java    |  10 +-
 .../internals/KTableSourceValueGetterSupplier.java |   6 +-
 .../kstream/internals/KTableTransformValues.java   |  10 +-
 .../internals/graph/KTableKTableJoinNode.java      |   9 +-
 .../kstream/internals/graph/StreamToTableNode.java |   3 +-
 .../internals/graph/TableProcessorNode.java        |   8 +-
 .../kstream/internals/graph/TableSourceNode.java   |   5 +-
 .../state/internals/KeyValueStoreWrapper.java      | 145 +++++++++
 .../state/internals/KeyValueStoreWrapperTest.java  | 356 +++++++++++++++++++++
 19 files changed, 573 insertions(+), 80 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 109160de507..27a3d488918 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -25,8 +25,8 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +63,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
 
 
     private class KStreamAggregateProcessor extends ContextualProcessor<KIn, 
VIn, KIn, Change<VAgg>> {
-        private TimestampedKeyValueStore<KIn, VAgg> store;
+        private KeyValueStoreWrapper<KIn, VAgg> store;
         private Sensor droppedRecordsSensor;
         private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder;
 
@@ -74,9 +74,9 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
                 Thread.currentThread().getName(),
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics());
-            store =  context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
-                store,
+                store.getStore(),
                 context,
                 new TimestampedCacheFlushListener<>(context),
                 sendOldValues);
@@ -118,7 +118,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
 
             newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
 
-            store.put(record.key(), ValueAndTimestamp.make(newAgg, 
newTimestamp));
+            store.put(record.key(), newAgg, newTimestamp);
             tupleForwarder.maybeForward(
                 record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
                     .withTimestamp(newTimestamp));
@@ -141,11 +141,11 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
     }
 
     private class KStreamAggregateValueGetter implements 
KTableValueGetter<KIn, VAgg> {
-        private TimestampedKeyValueStore<KIn, VAgg> store;
+        private KeyValueStoreWrapper<KIn, VAgg> store;
 
         @Override
         public void init(final ProcessorContext<?, ?> context) {
-            store = context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index e936855aac9..b801d2b60ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -24,8 +24,8 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +58,7 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
 
 
     private class KStreamReduceProcessor extends ContextualProcessor<K, V, K, 
Change<V>> {
-        private TimestampedKeyValueStore<K, V> store;
+        private KeyValueStoreWrapper<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
         private Sensor droppedRecordsSensor;
 
@@ -70,9 +70,9 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics()
             );
-            store = context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
-                store,
+                store.getStore(),
                 context,
                 new TimestampedCacheFlushListener<>(context),
                 sendOldValues);
@@ -112,7 +112,7 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
                 newTimestamp = Math.max(record.timestamp(), 
oldAggAndTimestamp.timestamp());
             }
 
-            store.put(record.key(), ValueAndTimestamp.make(newAgg, 
newTimestamp));
+            store.put(record.key(), newAgg, newTimestamp);
             tupleForwarder.maybeForward(
                 record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
                     .withTimestamp(newTimestamp));
@@ -136,11 +136,11 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
 
 
     private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
-        private TimestampedKeyValueStore<K, V> store;
+        private KeyValueStoreWrapper<K, V> store;
 
         @Override
         public void init(final ProcessorContext<?, ?> context) {
-            store = context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index f10af467404..75442555cc2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -22,8 +22,8 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
@@ -60,15 +60,15 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
     }
 
     private class KTableAggregateProcessor implements Processor<KIn, 
Change<VIn>, KIn, Change<VAgg>> {
-        private TimestampedKeyValueStore<KIn, VAgg> store;
+        private KeyValueStoreWrapper<KIn, VAgg> store;
         private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext<KIn, Change<VAgg>> context) {
-            store = (TimestampedKeyValueStore<KIn, VAgg>) 
context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
-                store,
+                store.getStore(),
                 context,
                 new TimestampedCacheFlushListener<>(context),
                 sendOldValues);
@@ -116,7 +116,7 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
             }
 
             // update the store with the new value
-            store.put(record.key(), ValueAndTimestamp.make(newAgg, 
newTimestamp));
+            store.put(record.key(), newAgg, newTimestamp);
             tupleForwarder.maybeForward(
                 record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
                     .withTimestamp(newTimestamp));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 99f50be1af2..6287aa40c55 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -20,8 +20,8 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
@@ -88,16 +88,16 @@ class KTableFilter<KIn, VIn> implements 
KTableProcessorSupplier<KIn, VIn, KIn, V
 
     private class KTableFilterProcessor implements Processor<KIn, Change<VIn>, 
KIn, Change<VIn>> {
         private ProcessorContext<KIn, Change<VIn>> context;
-        private TimestampedKeyValueStore<KIn, VIn> store;
+        private KeyValueStoreWrapper<KIn, VIn> store;
         private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
 
         @Override
         public void init(final ProcessorContext<KIn, Change<VIn>> context) {
             this.context = context;
             if (queryableName != null) {
-                store = context.getStateStore(queryableName);
+                store = new KeyValueStoreWrapper<>(context, queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
-                    store,
+                    store.getStore(),
                     context,
                     new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
@@ -117,7 +117,7 @@ class KTableFilter<KIn, VIn> implements 
KTableProcessorSupplier<KIn, VIn, KIn, V
             }
 
             if (queryableName != null) {
-                store.put(key, ValueAndTimestamp.make(newValue, 
record.timestamp()));
+                store.put(key, newValue, record.timestamp());
                 tupleForwarder.maybeForward(record.withValue(new 
Change<>(newValue, oldValue)));
             } else {
                 context.forward(record.withValue(new Change<>(newValue, 
oldValue)));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index e34ac2f5841..c2786257a74 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -175,7 +175,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final Serde<K> keySerde;
         final Serde<V> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
+        final StoreBuilder<?> storeBuilder;
 
         if (materializedInternal != null) {
             // we actually do not need to generate store names at all since if 
it is not specified, we will not
@@ -290,7 +290,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+        final StoreBuilder<?> storeBuilder;
 
         if (materializedInternal != null) {
             // we actually do not need to generate store names at all since if 
it is not specified, we will not
@@ -445,7 +445,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+        final StoreBuilder<?> storeBuilder;
 
         if (materializedInternal != null) {
             // don't inherit parent value serde, since this operation may 
change the value type, more specifically:
@@ -750,7 +750,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+        final StoreBuilder<?> storeBuilder;
 
         if (materializedInternal != null) {
             if (materializedInternal.keySerde() == null) {
@@ -1270,7 +1270,7 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
             materializedInternal.queryableStoreName()
         );
 
-        final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
+        final StoreBuilder<?> resultStore =
             new 
TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
 
         final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 72dbe40b7f1..2ed1dc42a97 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -20,12 +20,11 @@ import 
org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, K, V> {
 
@@ -98,7 +97,7 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
     }
 
     private class KTableKTableJoinMergeProcessor extends 
ContextualProcessor<K, Change<V>, K, Change<V>> {
-        private TimestampedKeyValueStore<K, V> store;
+        private KeyValueStoreWrapper<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
@@ -106,9 +105,9 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
         public void init(final ProcessorContext<K, Change<V>> context) {
             super.init(context);
             if (queryableName != null) {
-                store = (TimestampedKeyValueStore<K, V>) 
context.getStateStore(queryableName);
+                store = new KeyValueStoreWrapper<>(context, queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
-                    store,
+                    store.getStore(),
                     context,
                     new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
@@ -118,7 +117,7 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
         @Override
         public void process(final Record<K, Change<V>> record) {
             if (queryableName != null) {
-                store.put(record.key(), 
ValueAndTimestamp.make(record.value().newValue, record.timestamp()));
+                store.put(record.key(), record.value().newValue, 
record.timestamp());
                 tupleForwarder.maybeForward(record);
             } else {
                 if (sendOldValues) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index acea6c0cf2c..f0897dcdb05 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,8 +20,8 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
@@ -106,16 +106,16 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
 
     private class KTableMapValuesProcessor implements Processor<KIn, 
Change<VIn>, KIn, Change<VOut>> {
         private ProcessorContext<KIn, Change<VOut>> context;
-        private TimestampedKeyValueStore<KIn, VOut> store;
+        private KeyValueStoreWrapper<KIn, VOut> store;
         private TimestampedTupleForwarder<KIn, VOut> tupleForwarder;
 
         @Override
         public void init(final ProcessorContext<KIn, Change<VOut>> context) {
             this.context = context;
             if (queryableName != null) {
-                store = context.getStateStore(queryableName);
+                store = new KeyValueStoreWrapper<>(context, queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
-                    store,
+                    store.getStore(),
                     context,
                     new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
@@ -128,7 +128,7 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
             final VOut oldValue = computeOldValue(record.key(), 
record.value());
 
             if (queryableName != null) {
-                store.put(record.key(), ValueAndTimestamp.make(newValue, 
record.timestamp()));
+                store.put(record.key(), newValue, record.timestamp());
                 tupleForwarder.maybeForward(record.withValue(new 
Change<>(newValue, oldValue)));
             } else {
                 context.forward(record.withValue(new Change<>(newValue, 
oldValue)));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index e7570366f24..ba7e65081d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 public class KTableMaterializedValueGetterSupplier<K, V> implements 
KTableValueGetterSupplier<K, V> {
     private final String storeName;
@@ -37,11 +37,11 @@ public class KTableMaterializedValueGetterSupplier<K, V> 
implements KTableValueG
     }
 
     private class KTableMaterializedValueGetter implements 
KTableValueGetter<K, V> {
-        private TimestampedKeyValueStore<K, V> store;
+        private KeyValueStoreWrapper<K, V> store;
 
         @Override
         public void init(final ProcessorContext<?, ?> context) {
-            store = context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
index a28b973d75f..91fe0e4a277 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
@@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Collection;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 public class KTablePassThrough<KIn, VIn> implements 
KTableProcessorSupplier<KIn, VIn, KIn, VIn> {
     private final Collection<KStreamAggProcessorSupplier> parents;
@@ -79,11 +79,11 @@ public class KTablePassThrough<KIn, VIn> implements 
KTableProcessorSupplier<KIn,
     }
 
     private class KTablePassThroughValueGetter implements 
KTableValueGetter<KIn, VIn> {
-        private TimestampedKeyValueStore<KIn, VIn> store;
+        private KeyValueStoreWrapper<KIn, VIn> store;
 
         @Override
         public void init(final ProcessorContext<?, ?> context) {
-            store = context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 08d04494957..ef40638dd28 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
@@ -54,15 +54,15 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, K, V> {
 
     private class KTableReduceProcessor implements Processor<K, Change<V>, K, 
Change<V>> {
 
-        private TimestampedKeyValueStore<K, V> store;
+        private KeyValueStoreWrapper<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext<K, Change<V>> context) {
-            store = (TimestampedKeyValueStore<K, V>) 
context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
-                store,
+                store.getStore(),
                 context,
                 new TimestampedCacheFlushListener<>(context),
                 sendOldValues);
@@ -106,7 +106,7 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, K, V> {
             }
 
             // update the store with the new value
-            store.put(record.key(), ValueAndTimestamp.make(newAgg, 
newTimestamp));
+            store.put(record.key(), newAgg, newTimestamp);
             tupleForwarder.maybeForward(
                 record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
                     .withTimestamp(newTimestamp));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 6de8ede316b..7856c91d839 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -26,8 +26,8 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +76,7 @@ public class KTableSource<KIn, VIn> implements 
ProcessorSupplier<KIn, VIn, KIn,
     private class KTableSourceProcessor implements Processor<KIn, VIn, KIn, 
Change<VIn>> {
 
         private ProcessorContext<KIn, Change<VIn>> context;
-        private TimestampedKeyValueStore<KIn, VIn> store;
+        private KeyValueStoreWrapper<KIn, VIn> store;
         private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
         private Sensor droppedRecordsSensor;
 
@@ -88,9 +88,9 @@ public class KTableSource<KIn, VIn> implements 
ProcessorSupplier<KIn, VIn, KIn,
             droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(),
                 context.taskId().toString(), metrics);
             if (queryableName != null) {
-                store = context.getStateStore(queryableName);
+                store = new KeyValueStoreWrapper<>(context, queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
-                    store,
+                    store.getStore(),
                     context,
                     new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
@@ -146,7 +146,7 @@ public class KTableSource<KIn, VIn> implements 
ProcessorSupplier<KIn, VIn, KIn,
                 } else {
                     oldValue = null;
                 }
-                store.put(record.key(), ValueAndTimestamp.make(record.value(), 
record.timestamp()));
+                store.put(record.key(), record.value(), record.timestamp());
                 tupleForwarder.maybeForward(record.withValue(new 
Change<>(record.value(), oldValue)));
             } else {
                 context.forward(record.withValue(new Change<>(record.value(), 
null)));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 94701318fa2..1c340943354 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -18,8 +18,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 public class KTableSourceValueGetterSupplier<K, V> implements 
KTableValueGetterSupplier<K, V> {
     private final String storeName;
@@ -38,10 +38,10 @@ public class KTableSourceValueGetterSupplier<K, V> 
implements KTableValueGetterS
     }
 
     private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
-        private TimestampedKeyValueStore<K, V> store = null;
+        private KeyValueStoreWrapper<K, V> store;
 
         public void init(final ProcessorContext<?, ?> context) {
-            store = context.getStateStore(storeName);
+            store = new KeyValueStoreWrapper<>(context, storeName);
         }
 
         public ValueAndTimestamp<V> get(final K key) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 6aac1d4183b..5975571f0fa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -26,10 +26,10 @@ import org.apache.kafka.streams.processor.api.Record;
 import 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Objects;
+import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -90,7 +90,7 @@ class KTableTransformValues<K, V, VOut> implements 
KTableProcessorSupplier<K, V,
 
     private class KTableTransformValuesProcessor extends 
ContextualProcessor<K, Change<V>, K, Change<VOut>> {
         private final ValueTransformerWithKey<? super K, ? super V, ? extends 
VOut> valueTransformer;
-        private TimestampedKeyValueStore<K, VOut> store;
+        private KeyValueStoreWrapper<K, VOut> store;
         private TimestampedTupleForwarder<K, VOut> tupleForwarder;
 
         private KTableTransformValuesProcessor(final ValueTransformerWithKey<? 
super K, ? super V, ? extends VOut> valueTransformer) {
@@ -103,9 +103,9 @@ class KTableTransformValues<K, V, VOut> implements 
KTableProcessorSupplier<K, V,
             final InternalProcessorContext<K, Change<VOut>> 
internalProcessorContext = (InternalProcessorContext<K, Change<VOut>>) context;
             valueTransformer.init(new 
ForwardingDisabledProcessorContext(internalProcessorContext));
             if (queryableName != null) {
-                store = context.getStateStore(queryableName);
+                store = new KeyValueStoreWrapper<>(context, queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
-                    store,
+                    store.getStore(),
                     context,
                     new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
@@ -121,7 +121,7 @@ class KTableTransformValues<K, V, VOut> implements 
KTableProcessorSupplier<K, V,
                 context().forward(record.withValue(new Change<>(newValue, 
oldValue)));
             } else {
                 final VOut oldValue = sendOldValues ? 
getValueOrNull(store.get(record.key())) : null;
-                store.put(record.key(), ValueAndTimestamp.make(newValue, 
record.timestamp()));
+                store.put(record.key(), newValue, record.timestamp());
                 tupleForwarder.maybeForward(record.withValue(new 
Change<>(newValue, oldValue)));
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 2772a668108..dec7b437b2c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -24,7 +24,6 @@ import 
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Arrays;
 
@@ -37,7 +36,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
     private final Serde<VR> valueSerde;
     private final String[] joinThisStoreNames;
     private final String[] joinOtherStoreNames;
-    private final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+    private final StoreBuilder<?> storeBuilder;
 
     KTableKTableJoinNode(final String nodeName,
                          final ProcessorParameters<K, Change<V1>, ?, ?> 
joinThisProcessorParameters,
@@ -49,7 +48,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
                          final Serde<VR> valueSerde,
                          final String[] joinThisStoreNames,
                          final String[] joinOtherStoreNames,
-                         final StoreBuilder<TimestampedKeyValueStore<K, VR>> 
storeBuilder) {
+                         final StoreBuilder<?> storeBuilder) {
 
         super(nodeName,
             null,
@@ -148,7 +147,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
         private String[] joinThisStoreNames;
         private String[] joinOtherStoreNames;
         private String queryableStoreName;
-        private StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
+        private StoreBuilder<?> storeBuilder;
 
         private KTableKTableJoinNodeBuilder() {
         }
@@ -203,7 +202,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
withStoreBuilder(final StoreBuilder<TimestampedKeyValueStore<K, VR>> 
storeBuilder) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
withStoreBuilder(final StoreBuilder<?> storeBuilder) {
             this.storeBuilder = storeBuilder;
             return this;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index 8c071dad229..6d79deb8105 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -24,7 +24,6 @@ import 
org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMateri
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 /**
  * Represents a KTable convert From KStream
@@ -53,7 +52,7 @@ public class StreamToTableNode<K, V> extends GraphNode {
     @SuppressWarnings("unchecked")
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
+        final StoreBuilder<?> storeBuilder =
             new 
TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
 
         final String processorName = processorParameters.processorName();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index c522b5a9f3f..5b25fb682f1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals.graph;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Arrays;
 import java.util.Objects;
@@ -28,19 +27,18 @@ import java.util.Objects;
 public class TableProcessorNode<K, V> extends GraphNode {
 
     private final ProcessorParameters<K, V, ?, ?> processorParameters;
-    private final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
+    private final StoreBuilder<?> storeBuilder;
     private final String[] storeNames;
 
     public TableProcessorNode(final String nodeName,
                               final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                              final StoreBuilder<TimestampedKeyValueStore<K, 
V>> storeBuilder) {
+                              final StoreBuilder<?> storeBuilder) {
         this(nodeName, processorParameters, storeBuilder, null);
     }
 
     public TableProcessorNode(final String nodeName,
                               final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                              // TODO KIP-300: we are enforcing this as a 
keyvalue store, but it should go beyond any type of stores
-                              final StoreBuilder<TimestampedKeyValueStore<K, 
V>> storeBuilder,
+                              final StoreBuilder<?> storeBuilder,
                               final String[] storeNames) {
         super(nodeName);
         this.processorParameters = processorParameters;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 52413b9b61b..e218263e9b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -27,7 +27,6 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Collections;
 
@@ -94,9 +93,7 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, 
V> {
             throw new IllegalStateException("A table source node must have a 
single topic as input");
         }
 
-        // TODO: we assume source KTables can only be timestamped-key-value 
stores for now.
-        // should be expanded for other types of stores as well.
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
+        final StoreBuilder<?> storeBuilder =
             new 
TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
 
         if (isGlobalKTable) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
new file mode 100644
index 00000000000..9c2920570aa
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All 
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class KeyValueStoreWrapper<K, V> implements StateStore {
+
+    private TimestampedKeyValueStore<K, V> timestampedStore = null;
+    private VersionedKeyValueStore<K, V> versionedStore = null;
+
+    // same as either timestampedStore or versionedStore above. kept merely as 
a convenience
+    // to simplify implementation for methods which do not depend on store 
type.
+    private StateStore store = null;
+
+    public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        try {
+            // first try timestamped store
+            timestampedStore = context.getStateStore(storeName);
+            store = timestampedStore;
+            return;
+        } catch (final ClassCastException e) {
+            // ignore since could be versioned store instead
+        }
+
+        try {
+            // next try versioned store
+            versionedStore = context.getStateStore(storeName);
+            store = versionedStore;
+        } catch (final ClassCastException e) {
+            store = context.getStateStore(storeName);
+            final String storeType = store == null ? "null" : 
store.getClass().getName();
+            throw new InvalidStateStoreException("KTable source state store 
must implement either "
+                + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: " 
+ storeType);
+        }
+    }
+
+    public ValueAndTimestamp<V> get(final K key) {
+        if (timestampedStore != null) {
+            return timestampedStore.get(key);
+        }
+        if (versionedStore != null) {
+            final VersionedRecord<V> versionedRecord = versionedStore.get(key);
+            return versionedRecord == null
+                ? null
+                : ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp());
+        }
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+    }
+
+    public void put(final K key, final V value, final long timestamp) {
+        if (timestampedStore != null) {
+            timestampedStore.put(key, ValueAndTimestamp.make(value, 
timestamp));
+            return;
+        }
+        if (versionedStore != null) {
+            versionedStore.put(key, value, timestamp);
+            return;
+        }
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+    }
+
+    public StateStore getStore() {
+        return store;
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Deprecated
+    @Override
+    public void init(final org.apache.kafka.streams.processor.ProcessorContext 
context, final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
+    public void flush() {
+        store.flush();
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return store.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
+        return store.query(query, positionBound, config);
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
new file mode 100644
index 00000000000..869d4858f75
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KeyValueStoreWrapperTest {
+
+    private static final String STORE_NAME = "kvStore";
+    private static final String KEY = "k";
+    private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP
+        = ValueAndTimestamp.make("v", 8L);
+
+    @Mock
+    private TimestampedKeyValueStore<String, String> timestampedStore;
+    @Mock
+    private VersionedKeyValueStore<String, String> versionedStore;
+    @Mock
+    private ProcessorContext context;
+    @Mock
+    private Query query;
+    @Mock
+    private PositionBound positionBound;
+    @Mock
+    private QueryConfig queryConfig;
+    @Mock
+    private QueryResult result;
+    @Mock
+    private Position position;
+
+    private KeyValueStoreWrapper<String, String> wrapper;
+
+    @Test
+    public void shouldThrowOnNonTimestampedOrVersionedStore() {
+        
when(context.getStateStore(STORE_NAME)).thenReturn(mock(KeyValueStore.class));
+
+        assertThrows(InvalidStateStoreException.class, () -> new 
KeyValueStoreWrapper<>(context, STORE_NAME));
+    }
+
+    @Test
+    public void shouldGetFromTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+        when(timestampedStore.get(KEY)).thenReturn(VALUE_AND_TIMESTAMP);
+
+        assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP));
+    }
+
+    @Test
+    public void shouldGetFromVersionedStore() {
+        givenWrapperWithVersionedStore();
+        when(versionedStore.get(KEY)).thenReturn(
+            new VersionedRecord<>(
+                VALUE_AND_TIMESTAMP.value(),
+                VALUE_AND_TIMESTAMP.timestamp())
+        );
+
+        assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP));
+    }
+
+    @Test
+    public void shouldGetNullFromTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+        when(timestampedStore.get(KEY)).thenReturn(null);
+
+        assertThat(wrapper.get(KEY), nullValue());
+    }
+
+    @Test
+    public void shouldGetNullFromVersionedStore() {
+        givenWrapperWithVersionedStore();
+        when(versionedStore.get(KEY)).thenReturn(null);
+
+        assertThat(wrapper.get(KEY), nullValue());
+    }
+
+    @Test
+    public void shouldPutToTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+
+        wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(), 
VALUE_AND_TIMESTAMP.timestamp());
+
+        verify(timestampedStore).put(KEY, VALUE_AND_TIMESTAMP);
+    }
+
+    @Test
+    public void shouldPutToVersionedStore() {
+        givenWrapperWithVersionedStore();
+
+        wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(), 
VALUE_AND_TIMESTAMP.timestamp());
+
+        verify(versionedStore).put(KEY, VALUE_AND_TIMESTAMP.value(), 
VALUE_AND_TIMESTAMP.timestamp());
+    }
+
+    @Test
+    public void shouldPutNullToTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+
+        wrapper.put(KEY, null, VALUE_AND_TIMESTAMP.timestamp());
+
+        verify(timestampedStore).put(KEY, null);
+    }
+
+    @Test
+    public void shouldPutNullToVersionedStore() {
+        givenWrapperWithVersionedStore();
+
+        wrapper.put(KEY, null, VALUE_AND_TIMESTAMP.timestamp());
+
+        verify(versionedStore).put(KEY, null, VALUE_AND_TIMESTAMP.timestamp());
+    }
+
+    @Test
+    public void shouldGetTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+
+        assertThat(wrapper.getStore(), equalTo(timestampedStore));
+    }
+
+    @Test
+    public void shouldGetVersionedStore() {
+        givenWrapperWithVersionedStore();
+
+        assertThat(wrapper.getStore(), equalTo(versionedStore));
+    }
+
+    @Test
+    public void shouldGetNameForTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+        when(timestampedStore.name()).thenReturn(STORE_NAME);
+
+        assertThat(wrapper.name(), equalTo(STORE_NAME));
+    }
+
+    @Test
+    public void shouldGetNameForVersionedStore() {
+        givenWrapperWithVersionedStore();
+        when(versionedStore.name()).thenReturn(STORE_NAME);
+
+        assertThat(wrapper.name(), equalTo(STORE_NAME));
+    }
+
+    @Deprecated
+    @Test
+    public void shouldDeprecatedInitTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+        final org.apache.kafka.streams.processor.ProcessorContext mockContext
+            = mock(org.apache.kafka.streams.processor.ProcessorContext.class);
+
+        wrapper.init(mockContext, wrapper);
+
+        verify(timestampedStore).init(mockContext, wrapper);
+    }
+
+    @Deprecated
+    @Test
+    public void shouldDeprecatedInitVersionedStore() {
+        givenWrapperWithVersionedStore();
+        final org.apache.kafka.streams.processor.ProcessorContext mockContext
+            = mock(org.apache.kafka.streams.processor.ProcessorContext.class);
+
+        wrapper.init(mockContext, wrapper);
+
+        verify(versionedStore).init(mockContext, wrapper);
+    }
+
+    @Test
+    public void shouldInitTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+        final StateStoreContext mockContext = mock(StateStoreContext.class);
+
+        wrapper.init(mockContext, wrapper);
+
+        verify(timestampedStore).init(mockContext, wrapper);
+    }
+
+    @Test
+    public void shouldInitVersionedStore() {
+        givenWrapperWithVersionedStore();
+        final StateStoreContext mockContext = mock(StateStoreContext.class);
+
+        wrapper.init(mockContext, wrapper);
+
+        verify(versionedStore).init(mockContext, wrapper);
+    }
+
+    @Test
+    public void shouldFlushTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+
+        wrapper.flush();
+
+        verify(timestampedStore).flush();
+    }
+
+    @Test
+    public void shouldFlushVersionedStore() {
+        givenWrapperWithVersionedStore();
+
+        wrapper.flush();
+
+        verify(versionedStore).flush();
+    }
+
+    @Test
+    public void shouldCloseTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+
+        wrapper.close();
+
+        verify(timestampedStore).close();
+    }
+
+    @Test
+    public void shouldCloseVersionedStore() {
+        givenWrapperWithVersionedStore();
+
+        wrapper.close();
+
+        verify(versionedStore).close();
+    }
+
+    @Test
+    public void shouldReturnPersistentForTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+
+        // test "persistent = true"
+        when(timestampedStore.persistent()).thenReturn(true);
+        assertThat(wrapper.persistent(), equalTo(true));
+
+        // test "persistent = false"
+        when(timestampedStore.persistent()).thenReturn(false);
+        assertThat(wrapper.persistent(), equalTo(false));
+    }
+
+    @Test
+    public void shouldReturnPersistentForVersionedStore() {
+        givenWrapperWithVersionedStore();
+
+        // test "persistent = true"
+        when(versionedStore.persistent()).thenReturn(true);
+        assertThat(wrapper.persistent(), equalTo(true));
+
+        // test "persistent = false"
+        when(versionedStore.persistent()).thenReturn(false);
+        assertThat(wrapper.persistent(), equalTo(false));
+    }
+
+    @Test
+    public void shouldReturnIsOpenForTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+
+        // test "isOpen = true"
+        when(timestampedStore.isOpen()).thenReturn(true);
+        assertThat(wrapper.isOpen(), equalTo(true));
+
+        // test "isOpen = false"
+        when(timestampedStore.isOpen()).thenReturn(false);
+        assertThat(wrapper.isOpen(), equalTo(false));
+    }
+
+    @Test
+    public void shouldReturnIsOpenForVersionedStore() {
+        givenWrapperWithVersionedStore();
+
+        // test "isOpen = true"
+        when(versionedStore.isOpen()).thenReturn(true);
+        assertThat(wrapper.isOpen(), equalTo(true));
+
+        // test "isOpen = false"
+        when(versionedStore.isOpen()).thenReturn(false);
+        assertThat(wrapper.isOpen(), equalTo(false));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldQueryTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+        when(timestampedStore.query(query, positionBound, 
queryConfig)).thenReturn(result);
+
+        assertThat(wrapper.query(query, positionBound, queryConfig), 
equalTo(result));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldQueryVersionedStore() {
+        givenWrapperWithVersionedStore();
+        when(versionedStore.query(query, positionBound, 
queryConfig)).thenReturn(result);
+
+        assertThat(wrapper.query(query, positionBound, queryConfig), 
equalTo(result));
+    }
+
+    @Test
+    public void shouldGetPositionForTimestampedStore() {
+        givenWrapperWithTimestampedStore();
+        when(timestampedStore.getPosition()).thenReturn(position);
+
+        assertThat(wrapper.getPosition(), equalTo(position));
+    }
+
+    @Test
+    public void shouldGetPositionForVersionedStore() {
+        givenWrapperWithVersionedStore();
+        when(versionedStore.getPosition()).thenReturn(position);
+
+        assertThat(wrapper.getPosition(), equalTo(position));
+    }
+
+    private void givenWrapperWithTimestampedStore() {
+        when(context.getStateStore(STORE_NAME)).thenReturn(timestampedStore);
+        wrapper = new KeyValueStoreWrapper<>(context, STORE_NAME);
+    }
+
+    private void givenWrapperWithVersionedStore() {
+        when(context.getStateStore(STORE_NAME)).thenReturn(versionedStore);
+        wrapper = new KeyValueStoreWrapper<>(context, STORE_NAME);
+    }
+}
\ No newline at end of file

Reply via email to