This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new b3b0ed6  KAFKA-7916: Unify store wrapping code for clarity (#6255)
b3b0ed6 is described below

commit b3b0ed637238375739c76fc5237f8c8414453875
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Thu Feb 14 10:38:01 2019 -0600

    KAFKA-7916: Unify store wrapping code for clarity (#6255)
    
    Refactor internal store wrapping for improved maintainability.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang 
<guozh...@confluent.io>
---
 .../streams/kstream/internals/TupleForwarder.java  |   4 +-
 .../processor/internals/AbstractStateManager.java  |   9 ++
 .../internals/GlobalStateManagerImpl.java          |  10 +-
 .../processor/internals/ProcessorContextImpl.java  |  44 ++++-----
 .../processor/internals/ProcessorStateManager.java |   7 +-
 .../state/internals/CachingKeyValueStore.java      |  50 +++-------
 .../state/internals/CachingSessionStore.java       |  30 +++---
 .../state/internals/CachingWindowStore.java        |  32 +++---
 .../internals/ChangeLoggingKeyValueBytesStore.java |  28 +++---
 .../internals/ChangeLoggingSessionBytesStore.java  |  20 ++--
 .../internals/ChangeLoggingWindowBytesStore.java   |  22 ++---
 .../state/internals/MeteredKeyValueStore.java      |  38 ++++----
 .../state/internals/MeteredSessionStore.java       |  18 ++--
 .../state/internals/MeteredWindowStore.java        |  20 ++--
 .../streams/state/internals/RecordConverter.java   |  28 ------
 .../streams/state/internals/RecordConverters.java  |  60 ++++++++++++
 .../state/internals/RocksDBSessionStore.java       |  18 ++--
 .../state/internals/RocksDBWindowStore.java        |  20 ++--
 .../streams/state/internals/WrappedStateStore.java | 105 +++++++++-----------
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   2 +-
 .../internals/GlobalStateManagerImplTest.java      | 108 ++++-----------------
 .../internals/KeyValueStoreMaterializerTest.java   |  15 ++-
 .../processor/internals/StateRestorerTest.java     |   5 +-
 .../internals/StoreChangelogReaderTest.java        |  61 ++++++------
 .../state/internals/CachingKeyValueStoreTest.java  |   5 +-
 .../state/internals/KeyValueStoreBuilderTest.java  |  16 +--
 .../state/internals/SessionStoreBuilderTest.java   |  16 +--
 .../state/internals/WindowStoreBuilderTest.java    |  16 +--
 28 files changed, 347 insertions(+), 460 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index 127057f..0862e47 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -49,10 +49,10 @@ class TupleForwarder<K, V> {
         if (store instanceof CachedStateStore) {
             return (CachedStateStore) store;
         } else if (store instanceof WrappedStateStore) {
-            StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+            StateStore wrapped = ((WrappedStateStore) store).wrapped();
 
             while (wrapped instanceof WrappedStateStore && !(wrapped 
instanceof CachedStateStore)) {
-                wrapped = ((WrappedStateStore) wrapped).wrappedStore();
+                wrapped = ((WrappedStateStore) wrapped).wrapped();
             }
 
             if (!(wrapped instanceof CachedStateStore)) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index 66ddec9..36482aa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -32,6 +33,10 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.identity;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
+import static 
org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;
+
 abstract class AbstractStateManager implements StateManager {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
@@ -50,6 +55,10 @@ abstract class AbstractStateManager implements StateManager {
         this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
     }
 
+    static RecordConverter converterForStore(final StateStore store) {
+        return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
+    }
+
     public void reinitializeStateStoresForPartitions(final Logger log,
                                                      final Map<String, 
StateStore> stateStores,
                                                      final Map<String, String> 
storeToChangelogTopic,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index efedbec..48319a5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -32,9 +32,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.internals.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.RecordConverter;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -198,17 +196,13 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
             }
         }
         try {
-            final StateStore stateStore =
-                store instanceof WrappedStateStore ? ((WrappedStateStore) 
store).inner() : store;
-            final RecordConverter recordConverter =
-                stateStore instanceof TimestampedBytesStore ? 
RecordConverter.converter() : record -> record;
-
             restoreState(
                 stateRestoreCallback,
                 topicPartitions,
                 highWatermarks,
                 store.name(),
-                recordConverter);
+                converterForStore(store)
+            );
             globalStores.put(store.name(), store);
         } finally {
             globalConsumer.unsubscribe();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 4409a95..4bf66e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -35,7 +35,7 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import 
org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import java.time.Duration;
 import java.util.List;
@@ -214,18 +214,13 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         return streamTimeSupplier.get();
     }
 
-    private abstract static class StateStoreReadOnlyDecorator<T extends 
StateStore> extends AbstractStateStore {
+    private abstract static class StateStoreReadOnlyDecorator<T extends 
StateStore> extends WrappedStateStore<T> {
         static final String ERROR_MESSAGE = "Global store is read only";
 
         private StateStoreReadOnlyDecorator(final T inner) {
             super(inner);
         }
 
-        @SuppressWarnings("unchecked")
-        T getInner() {
-            return (T) wrappedStore();
-        }
-
         @Override
         public void flush() {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
@@ -253,23 +248,23 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
 
         @Override
         public V get(final K key) {
-            return getInner().get(key);
+            return wrapped().get(key);
         }
 
         @Override
         public KeyValueIterator<K, V> range(final K from,
                                             final K to) {
-            return getInner().range(from, to);
+            return wrapped().range(from, to);
         }
 
         @Override
         public KeyValueIterator<K, V> all() {
-            return getInner().all();
+            return wrapped().all();
         }
 
         @Override
         public long approximateNumEntries() {
-            return getInner().approximateNumEntries();
+            return wrapped().approximateNumEntries();
         }
 
         @Override
@@ -319,7 +314,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         @Override
         public V fetch(final K key,
                        final long time) {
-            return getInner().fetch(key, time);
+            return wrapped().fetch(key, time);
         }
 
         @Deprecated
@@ -327,7 +322,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         public WindowStoreIterator<V> fetch(final K key,
                                             final long timeFrom,
                                             final long timeTo) {
-            return getInner().fetch(key, timeFrom, timeTo);
+            return wrapped().fetch(key, timeFrom, timeTo);
         }
 
         @Deprecated
@@ -336,19 +331,19 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
                                                       final K to,
                                                       final long timeFrom,
                                                       final long timeTo) {
-            return getInner().fetch(from, to, timeFrom, timeTo);
+            return wrapped().fetch(from, to, timeFrom, timeTo);
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, V> all() {
-            return getInner().all();
+            return wrapped().all();
         }
 
         @Deprecated
         @Override
         public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
                                                          final long timeTo) {
-            return getInner().fetchAll(timeFrom, timeTo);
+            return wrapped().fetchAll(timeFrom, timeTo);
         }
     }
 
@@ -364,7 +359,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                                final long 
earliestSessionEndTime,
                                                                final long 
latestSessionStartTime) {
-            return getInner().findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+            return wrapped().findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
         }
 
         @Override
@@ -372,7 +367,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
                                                                final K keyTo,
                                                                final long 
earliestSessionEndTime,
                                                                final long 
latestSessionStartTime) {
-            return getInner().findSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime);
+            return wrapped().findSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime);
         }
 
         @Override
@@ -388,33 +383,28 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
 
         @Override
         public AGG fetchSession(final K key, final long startTime, final long 
endTime) {
-            return getInner().fetchSession(key, startTime, endTime);
+            return wrapped().fetchSession(key, startTime, endTime);
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
-            return getInner().fetch(key);
+            return wrapped().fetch(key);
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
                                                         final K to) {
-            return getInner().fetch(from, to);
+            return wrapped().fetch(from, to);
         }
     }
 
-    private abstract static class StateStoreReadWriteDecorator<T extends 
StateStore> extends AbstractStateStore {
+    private abstract static class StateStoreReadWriteDecorator<T extends 
StateStore> extends WrappedStateStore<T> {
         static final String ERROR_MESSAGE = "This method may only be called by 
Kafka Streams";
 
         private StateStoreReadWriteDecorator(final T inner) {
             super(inner);
         }
 
-        @SuppressWarnings("unchecked")
-        T wrapped() {
-            return (T) super.wrappedStore();
-        }
-
         @Override
         public void init(final ProcessorContext context,
                          final StateStore root) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 468d3d6..f75f185 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -23,10 +23,8 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.internals.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -134,10 +132,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
 
         final TopicPartition storePartition = new TopicPartition(topic, 
getPartition(topic));
 
-        final StateStore stateStore =
-            store instanceof WrappedStateStore ? ((WrappedStateStore) 
store).inner() : store;
-        final RecordConverter recordConverter =
-            stateStore instanceof TimestampedBytesStore ? 
RecordConverter.converter() : record -> record;
+        final RecordConverter recordConverter = converterForStore(store);
 
         if (isStandby) {
             log.trace("Preparing standby replica of persistent state store {} 
with changelog topic {}", storeName, topic);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 992466c..4ccb2a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -34,9 +34,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore 
implements KeyValueStore<Bytes, byte[]>, CachedStateStore<K, V> {
+class CachingKeyValueStore<K, V> extends 
WrappedStateStore<KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, 
byte[]>, CachedStateStore<K, V> {
 
-    private final KeyValueStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private CacheFlushListener<K, V> flushListener;
@@ -52,7 +51,6 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
                          final Serde<K> keySerde,
                          final Serde<V> valueSerde) {
         super(underlying);
-        this.underlying = underlying;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
     }
@@ -61,7 +59,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
     public void init(final ProcessorContext context,
                      final StateStore root) {
         initInternal(context);
-        underlying.init(context, root);
+        super.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
         // when the stream thread is the current thread.
         streamThread = Thread.currentThread();
@@ -70,12 +68,12 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
     @SuppressWarnings("unchecked")
     private void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
-        this.serdes = new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 underlying.name()),
+        this.serdes = new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 name()),
                                         keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
 
         this.cache = this.context.getCache();
-        this.cacheName = 
ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), 
underlying.name());
+        this.cacheName = 
ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
         cache.addDirtyEntryFlushListener(cacheName, entries -> {
             for (final ThreadCache.DirtyEntry entry : entries) {
                 putAndMaybeForward(entry, (InternalProcessorContext) context);
@@ -87,7 +85,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
                                     final InternalProcessorContext context) {
         if (flushListener != null) {
             final byte[] newValueBytes = entry.newValue();
-            final byte[] oldValueBytes = newValueBytes == null || 
sendOldValues ? underlying.get(entry.key()) : null;
+            final byte[] oldValueBytes = newValueBytes == null || 
sendOldValues ? wrapped().get(entry.key()) : null;
 
             // this is an optimization: if this key did not exist in 
underlying store and also not in the cache,
             // we can skip flushing to downstream as well as writing to 
underlying store
@@ -96,7 +94,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
                 final V newValue = newValueBytes != null ? 
serdes.valueFrom(newValueBytes) : null;
                 final V oldValue = sendOldValues && oldValueBytes != null ? 
serdes.valueFrom(oldValueBytes) : null;
                 // we need to get the old values if needed, and then put to 
store, and then flush
-                underlying.put(entry.key(), entry.newValue());
+                wrapped().put(entry.key(), entry.newValue());
 
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
@@ -111,7 +109,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
                 }
             }
         } else {
-            underlying.put(entry.key(), entry.newValue());
+            wrapped().put(entry.key(), entry.newValue());
         }
     }
 
@@ -127,7 +125,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
         lock.writeLock().lock();
         try {
             cache.flush(cacheName);
-            underlying.flush();
+            super.flush();
         } finally {
             lock.writeLock().unlock();
         }
@@ -139,7 +137,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
             flush();
         } finally {
             try {
-                underlying.close();
+                super.close();
             } finally {
                 cache.close(cacheName);
             }
@@ -147,16 +145,6 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public boolean persistent() {
-        return underlying.persistent();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return underlying.isOpen();
-    }
-
-    @Override
     public byte[] get(final Bytes key) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
@@ -180,7 +168,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
             entry = cache.get(cacheName, key);
         }
         if (entry == null) {
-            final byte[] rawValue = underlying.get(key);
+            final byte[] rawValue = wrapped().get(key);
             if (rawValue == null) {
                 return null;
             }
@@ -199,7 +187,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
     public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                  final Bytes to) {
         validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> storeIterator = 
underlying.range(from, to);
+        final KeyValueIterator<Bytes, byte[]> storeIterator = 
wrapped().range(from, to);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(cacheName, from, to);
         return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator);
     }
@@ -207,7 +195,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
         validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>(this.name(), underlying.all());
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().all());
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.all(cacheName);
         return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator);
     }
@@ -217,7 +205,7 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
         validateStoreOpen();
         lock.readLock().lock();
         try {
-            return underlying.approximateNumEntries();
+            return wrapped().approximateNumEntries();
         } finally {
             lock.readLock().unlock();
         }
@@ -300,16 +288,4 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
         putInternal(key, null);
         return v;
     }
-
-    KeyValueStore<Bytes, byte[]> underlying() {
-        return underlying;
-    }
-
-    @Override
-    public StateStore inner() {
-        if (underlying instanceof WrappedStateStore) {
-            return ((WrappedStateStore) underlying).inner();
-        }
-        return underlying;
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 1c5c2f2..67a1588 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -30,9 +30,8 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.Objects;
 
-class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore 
implements SessionStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, AGG> {
+class CachingSessionStore<K, AGG> extends 
WrappedStateStore<SessionStore<Bytes, byte[]>> implements SessionStore<Bytes, 
byte[]>, CachedStateStore<Windowed<K>, AGG> {
 
-    private final SessionStore<Bytes, byte[]> bytesStore;
     private final SessionKeySchema keySchema;
     private final Serde<K> keySerde;
     private final Serde<AGG> aggSerde;
@@ -50,7 +49,6 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
                         final Serde<AGG> aggSerde,
                         final long segmentInterval) {
         super(bytesStore);
-        this.bytesStore = bytesStore;
         this.keySerde = keySerde;
         this.aggSerde = aggSerde;
         this.keySchema = new SessionKeySchema();
@@ -60,7 +58,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
     public void init(final ProcessorContext context, final StateStore root) {
         topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
         initInternal((InternalProcessorContext) context);
-        bytesStore.init(context, root);
+        super.init(context, root);
     }
 
     @SuppressWarnings("unchecked")
@@ -72,7 +70,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
-        cacheName = context.taskId() + "-" + bytesStore.name();
+        cacheName = context.taskId() + "-" + name();
         cache = context.getCache();
         cache.addDirtyEntryFlushListener(cacheName, entries -> {
             for (final ThreadCache.DirtyEntry entry : entries) {
@@ -89,9 +87,9 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
         final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
latestSessionStartTime));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
bytesStore.findSessions(key,
-                                                                               
                 earliestSessionEndTime,
-                                                                               
                 latestSessionStartTime);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
wrapped().findSessions(key,
+                                                                               
                earliestSessionEndTime,
+                                                                               
                latestSessionStartTime);
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key,
                                                                              
key,
                                                                              
earliestSessionEndTime,
@@ -111,7 +109,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
         final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
bytesStore.findSessions(
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
wrapped().findSessions(
             keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime
         );
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(keyFrom,
@@ -149,13 +147,13 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         if (cache == null) {
-            return bytesStore.fetchSession(key, startTime, endTime);
+            return wrapped().fetchSession(key, startTime, endTime);
         } else {
             final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, 
endTime);
             final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
             final LRUCacheEntry entry = cache.get(cacheName, cacheKey);
             if (entry == null) {
-                return bytesStore.fetchSession(key, startTime, endTime);
+                return wrapped().fetchSession(key, startTime, endTime);
             } else {
                 return entry.value();
             }
@@ -181,7 +179,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
         if (flushListener != null) {
             final byte[] newValueBytes = entry.newValue();
             final byte[] oldValueBytes = newValueBytes == null || 
sendOldValues ?
-                bytesStore.fetchSession(bytesKey.key(), 
bytesKey.window().start(), bytesKey.window().end()) : null;
+                wrapped().fetchSession(bytesKey.key(), 
bytesKey.window().start(), bytesKey.window().end()) : null;
 
             // this is an optimization: if this key did not exist in 
underlying store and also not in the cache,
             // we can skip flushing to downstream as well as writing to 
underlying store
@@ -190,7 +188,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
                 final AGG newValue = newValueBytes != null ? 
serdes.valueFrom(newValueBytes) : null;
                 final AGG oldValue = sendOldValues && oldValueBytes != null ? 
serdes.valueFrom(oldValueBytes) : null;
                 // we need to get the old values if needed, and then put to 
store, and then flush
-                bytesStore.put(bytesKey, entry.newValue());
+                wrapped().put(bytesKey, entry.newValue());
 
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
@@ -205,19 +203,19 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
                 }
             }
         } else {
-            bytesStore.put(bytesKey, entry.newValue());
+            wrapped().put(bytesKey, entry.newValue());
         }
     }
 
     public void flush() {
         cache.flush(cacheName);
-        bytesStore.flush();
+        super.flush();
     }
 
     public void close() {
         flush();
         cache.close(cacheName);
-        bytesStore.close();
+        super.close();
     }
 
     public void setFlushListener(final CacheFlushListener<Windowed<K>, AGG> 
flushListener,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 53d02dd..bc82cc4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -30,9 +30,8 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore 
implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> {
+class CachingWindowStore<K, V> extends WrappedStateStore<WindowStore<Bytes, 
byte[]>> implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, 
V> {
 
-    private final WindowStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final long windowSize;
@@ -55,7 +54,6 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
                        final long windowSize,
                        final long segmentInterval) {
         super(underlying);
-        this.underlying = underlying;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.windowSize = windowSize;
@@ -65,13 +63,13 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         initInternal((InternalProcessorContext) context);
-        underlying.init(context, root);
+        super.init(context, root);
     }
 
     @SuppressWarnings("unchecked")
     private void initInternal(final InternalProcessorContext context) {
         this.context = context;
-        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
underlying.name());
+        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
         serdes = new StateSerdes<>(topic,
                                    keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
                                    valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
@@ -79,7 +77,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         bytesSerdes = new StateSerdes<>(topic,
                                         Serdes.Bytes(),
                                         Serdes.ByteArray());
-        name = context.taskId() + "-" + underlying.name();
+        name = context.taskId() + "-" + name();
         cache = this.context.getCache();
 
         cache.addDirtyEntryFlushListener(name, entries -> {
@@ -97,7 +95,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         final Bytes key = windowedKeyBytes.key();
         if (flushListener != null) {
             final byte[] newValueBytes = entry.newValue();
-            final byte[] oldValueBytes = newValueBytes == null || 
sendOldValues ? underlying.fetch(key, windowStartTimestamp) : null;
+            final byte[] oldValueBytes = newValueBytes == null || 
sendOldValues ? wrapped().fetch(key, windowStartTimestamp) : null;
 
             // this is an optimization: if this key did not exist in 
underlying store and also not in the cache,
             // we can skip flushing to downstream as well as writing to 
underlying store
@@ -106,7 +104,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
                 final V newValue = newValueBytes != null ? 
serdes.valueFrom(newValueBytes) : null;
                 final V oldValue = sendOldValues && oldValueBytes != null ? 
serdes.valueFrom(oldValueBytes) : null;
                 // we need to get the old values if needed, and then put to 
store, and then flush
-                underlying.put(key, entry.newValue(), windowStartTimestamp);
+                wrapped().put(key, entry.newValue(), windowStartTimestamp);
 
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
@@ -121,7 +119,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
                 }
             }
         } else {
-            underlying.put(key, entry.newValue(), windowStartTimestamp);
+            wrapped().put(key, entry.newValue(), windowStartTimestamp);
         }
     }
 
@@ -135,14 +133,14 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
     @Override
     public synchronized void flush() {
         cache.flush(name);
-        underlying.flush();
+        wrapped().flush();
     }
 
     @Override
     public void close() {
         flush();
         cache.close(name);
-        underlying.close();
+        wrapped().close();
     }
 
     @Override
@@ -175,11 +173,11 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, 
timestamp, 0);
         final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
         if (cache == null) {
-            return underlying.fetch(key, timestamp);
+            return wrapped().fetch(key, timestamp);
         }
         final LRUCacheEntry entry = cache.get(name, cacheKey);
         if (entry == null) {
-            return underlying.fetch(key, timestamp);
+            return wrapped().fetch(key, timestamp);
         } else {
             return entry.value();
         }
@@ -192,7 +190,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         // if store is open outside as well.
         validateStoreOpen();
 
-        final WindowStoreIterator<byte[]> underlyingIterator = 
underlying.fetch(key, timeFrom, timeTo);
+        final WindowStoreIterator<byte[]> underlyingIterator = 
wrapped().fetch(key, timeFrom, timeTo);
         if (cache == null) {
             return underlyingIterator;
         }
@@ -218,7 +216,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         // if store is open outside as well.
         validateStoreOpen();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
underlying.fetch(from, to, timeFrom, timeTo);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().fetch(from, to, timeFrom, timeTo);
         if (cache == null) {
             return underlyingIterator;
         }
@@ -245,7 +243,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         validateStoreOpen();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]>  underlyingIterator = 
underlying.all();
+        final KeyValueIterator<Windowed<Bytes>, byte[]>  underlyingIterator = 
wrapped().all();
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.all(name);
 
         return new MergedSortedCacheWindowStoreKeyValueIterator(
@@ -262,7 +260,7 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom, final long timeTo) {
         validateStoreOpen();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
underlying.fetchAll(timeFrom, timeTo);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().fetchAll(timeFrom, timeTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.all(name);
 
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(null, null, timeFrom, timeTo);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 94c250c..d5f5ad2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -28,25 +28,23 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 
-public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore.AbstractStateStore implements KeyValueStore<Bytes, byte[]> {
-    private final KeyValueStore<Bytes, byte[]> inner;
+public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore<KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, 
byte[]> {
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
 
     ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) {
         super(inner);
-        this.inner = inner;
     }
 
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        inner.init(context, root);
-        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
inner.name());
-        this.changeLogger = new StoreChangeLogger<>(inner.name(), context, new 
StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+        super.init(context, root);
+        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
+        this.changeLogger = new StoreChangeLogger<>(name(), context, new 
StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
 
         // if the inner store is an LRU cache, add the eviction listener to 
log removed record
-        if (inner instanceof MemoryLRUCache) {
-            ((MemoryLRUCache<Bytes, byte[]>) inner).setWhenEldestRemoved((key, 
value) -> {
+        if (wrapped() instanceof MemoryLRUCache) {
+            ((MemoryLRUCache<Bytes, byte[]>) 
wrapped()).setWhenEldestRemoved((key, value) -> {
                 // pass null to indicate removal
                 changeLogger.logChange(key, null);
             });
@@ -55,13 +53,13 @@ public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore.AbstractS
 
     @Override
     public long approximateNumEntries() {
-        return inner.approximateNumEntries();
+        return wrapped().approximateNumEntries();
     }
 
     @Override
     public void put(final Bytes key,
                     final byte[] value) {
-        inner.put(key, value);
+        wrapped().put(key, value);
         changeLogger.logChange(key, value);
     }
 
@@ -77,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore.AbstractS
 
     @Override
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
-        inner.putAll(entries);
+        wrapped().putAll(entries);
         for (final KeyValue<Bytes, byte[]> entry : entries) {
             changeLogger.logChange(entry.key, entry.value);
         }
@@ -85,24 +83,24 @@ public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore.AbstractS
 
     @Override
     public byte[] delete(final Bytes key) {
-        final byte[] oldValue = inner.delete(key);
+        final byte[] oldValue = wrapped().delete(key);
         changeLogger.logChange(key, null);
         return oldValue;
     }
 
     @Override
     public byte[] get(final Bytes key) {
-        return inner.get(key);
+        return wrapped().get(key);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                  final Bytes to) {
-        return inner.range(from, to);
+        return wrapped().range(from, to);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
-        return inner.all();
+        return wrapped().all();
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 3ddbede..1ed163b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -27,25 +27,23 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
 /**
- * Simple wrapper around a {@link SegmentedBytesStore} to support writing
+ * Simple wrapper around a {@link SessionStore} to support writing
  * updates to a changelog
  */
-class ChangeLoggingSessionBytesStore extends 
WrappedStateStore.AbstractStateStore implements SessionStore<Bytes, byte[]> {
+class ChangeLoggingSessionBytesStore extends 
WrappedStateStore<SessionStore<Bytes, byte[]>> implements SessionStore<Bytes, 
byte[]> {
 
-    private final SessionStore<Bytes, byte[]> bytesStore;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
 
     ChangeLoggingSessionBytesStore(final SessionStore<Bytes, byte[]> 
bytesStore) {
         super(bytesStore);
-        this.bytesStore = bytesStore;
     }
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        bytesStore.init(context, root);
+        super.init(context, root);
         final String topic = ProcessorStateManager.storeChangelogTopic(
                 context.applicationId(),
-                bytesStore.name());
+                name());
         changeLogger = new StoreChangeLogger<>(
                 name(),
                 context,
@@ -55,30 +53,30 @@ class ChangeLoggingSessionBytesStore extends 
WrappedStateStore.AbstractStateStor
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key, final long earliestSessionEndTime, final long latestSessionStartTime) {
-        return bytesStore.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+        return wrapped().findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
     }
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
keyFrom, final Bytes keyTo, final long earliestSessionEndTime, final long 
latestSessionStartTime) {
-        return bytesStore.findSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime);
+        return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime);
     }
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
-        bytesStore.remove(sessionKey);
+        wrapped().remove(sessionKey);
         changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
-        bytesStore.put(sessionKey, aggregate);
+        wrapped().put(sessionKey, aggregate);
         changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), 
aggregate);
 
     }
 
     @Override
     public byte[] fetchSession(final Bytes key, final long startTime, final 
long endTime) {
-        return bytesStore.fetchSession(key, startTime, endTime);
+        return wrapped().fetchSession(key, startTime, endTime);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index a592471..a614f92 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -28,12 +28,11 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 /**
- * Simple wrapper around a {@link SegmentedBytesStore} to support writing
+ * Simple wrapper around a {@link WindowStore} to support writing
  * updates to a changelog
  */
-class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]> {
+class ChangeLoggingWindowBytesStore extends 
WrappedStateStore<WindowStore<Bytes, byte[]>> implements WindowStore<Bytes, 
byte[]> {
 
-    private final WindowStore<Bytes, byte[]> bytesStore;
     private final boolean retainDuplicates;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private ProcessorContext context;
@@ -42,36 +41,35 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
     ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
                                   final boolean retainDuplicates) {
         super(bytesStore);
-        this.bytesStore = bytesStore;
         this.retainDuplicates = retainDuplicates;
     }
 
     @Override
     public byte[] fetch(final Bytes key, final long timestamp) {
-        return bytesStore.fetch(key, timestamp);
+        return wrapped().fetch(key, timestamp);
     }
 
     @SuppressWarnings("deprecation")
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, 
final long to) {
-        return bytesStore.fetch(key, from, to);
+        return wrapped().fetch(key, from, to);
     }
 
     @SuppressWarnings("deprecation")
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo, final long from, final long to) {
-        return bytesStore.fetch(keyFrom, keyTo, from, to);
+        return wrapped().fetch(keyFrom, keyTo, from, to);
     }
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
-        return bytesStore.all();
+        return wrapped().all();
     }
 
     @SuppressWarnings("deprecation")
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom, final long timeTo) {
-        return bytesStore.fetchAll(timeFrom, timeTo);
+        return wrapped().fetchAll(timeFrom, timeTo);
     }
 
     @Override
@@ -81,15 +79,15 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
 
     @Override
     public void put(final Bytes key, final byte[] value, final long 
windowStartTimestamp) {
-        bytesStore.put(key, value, windowStartTimestamp);
+        wrapped().put(key, value, windowStartTimestamp);
         changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, maybeUpdateSeqnumForDups()), value);
     }
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
-        bytesStore.init(context, root);
-        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
bytesStore.name());
+        super.init(context, root);
+        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
         changeLogger = new StoreChangeLogger<>(
             name(),
             context,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index f3d1cae..0c08606 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -45,9 +45,8 @@ import static 
org.apache.kafka.streams.state.internals.metrics.Sensors.createTas
  * @param <K>
  * @param <V>
  */
-public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
+public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore<KeyValueStore<Bytes, byte[]>> implements KeyValueStore<K, V> {
 
-    private final KeyValueStore<Bytes, byte[]> inner;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private StateSerdes<K, V> serdes;
@@ -71,7 +70,6 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
                          final Serde<K> keySerde,
                          final Serde<V> valueSerde) {
         super(inner);
-        this.inner = inner;
         this.metricScope = metricScope;
         this.time = time != null ? time : Time.SYSTEM;
         this.keySerde = keySerde;
@@ -108,12 +106,12 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
         if (restoreTime.shouldRecord()) {
             measureLatency(
                 () -> {
-                    inner.init(context, root);
+                    super.init(context, root);
                     return null;
                 },
                 restoreTime);
         } else {
-            inner.init(context, root);
+            super.init(context, root);
         }
     }
 
@@ -125,16 +123,16 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
 
     @Override
     public long approximateNumEntries() {
-        return inner.approximateNumEntries();
+        return wrapped().approximateNumEntries();
     }
 
     @Override
     public V get(final K key) {
         try {
             if (getTime.shouldRecord()) {
-                return measureLatency(() -> 
outerValue(inner.get(keyBytes(key))), getTime);
+                return measureLatency(() -> 
outerValue(wrapped().get(keyBytes(key))), getTime);
             } else {
-                return outerValue(inner.get(keyBytes(key)));
+                return outerValue(wrapped().get(keyBytes(key)));
             }
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key);
@@ -148,11 +146,11 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
         try {
             if (putTime.shouldRecord()) {
                 measureLatency(() -> {
-                    inner.put(keyBytes(key), serdes.rawValue(value));
+                    wrapped().put(keyBytes(key), serdes.rawValue(value));
                     return null;
                 }, putTime);
             } else {
-                inner.put(keyBytes(key), serdes.rawValue(value));
+                wrapped().put(keyBytes(key), serdes.rawValue(value));
             }
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key, value);
@@ -165,10 +163,10 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
                          final V value) {
         if (putIfAbsentTime.shouldRecord()) {
             return measureLatency(
-                () -> outerValue(inner.putIfAbsent(keyBytes(key), 
serdes.rawValue(value))),
+                () -> outerValue(wrapped().putIfAbsent(keyBytes(key), 
serdes.rawValue(value))),
                 putIfAbsentTime);
         } else {
-            return outerValue(inner.putIfAbsent(keyBytes(key), 
serdes.rawValue(value)));
+            return outerValue(wrapped().putIfAbsent(keyBytes(key), 
serdes.rawValue(value)));
         }
     }
 
@@ -177,12 +175,12 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
         if (putAllTime.shouldRecord()) {
             measureLatency(
                 () -> {
-                    inner.putAll(innerEntries(entries));
+                    wrapped().putAll(innerEntries(entries));
                     return null;
                 },
                 putAllTime);
         } else {
-            inner.putAll(innerEntries(entries));
+            wrapped().putAll(innerEntries(entries));
         }
     }
 
@@ -190,9 +188,9 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
     public V delete(final K key) {
         try {
             if (deleteTime.shouldRecord()) {
-                return measureLatency(() -> 
outerValue(inner.delete(keyBytes(key))), deleteTime);
+                return measureLatency(() -> 
outerValue(wrapped().delete(keyBytes(key))), deleteTime);
             } else {
-                return outerValue(inner.delete(keyBytes(key)));
+                return outerValue(wrapped().delete(keyBytes(key)));
             }
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key);
@@ -204,13 +202,13 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
     public KeyValueIterator<K, V> range(final K from,
                                         final K to) {
         return new MeteredKeyValueIterator(
-            inner.range(Bytes.wrap(serdes.rawKey(from)), 
Bytes.wrap(serdes.rawKey(to))),
+            wrapped().range(Bytes.wrap(serdes.rawKey(from)), 
Bytes.wrap(serdes.rawKey(to))),
             rangeTime);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator(inner.all(), allTime);
+        return new MeteredKeyValueIterator(wrapped().all(), allTime);
     }
 
     @Override
@@ -218,12 +216,12 @@ public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateS
         if (flushTime.shouldRecord()) {
             measureLatency(
                 () -> {
-                    inner.flush();
+                    super.flush();
                     return null;
                 },
                 flushTime);
         } else {
-            inner.flush();
+            super.flush();
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 3bb7fca..0db67c3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -36,8 +36,7 @@ import java.util.Objects;
 import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
 import static 
org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
 
-public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateStore implements SessionStore<K, V> {
-    private final SessionStore<Bytes, byte[]> inner;
+public class MeteredSessionStore<K, V> extends 
WrappedStateStore<SessionStore<Bytes, byte[]>> implements SessionStore<K, V> {
     private final String metricScope;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
@@ -56,7 +55,6 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
                         final Serde<V> valueSerde,
                         final Time time) {
         super(inner);
-        this.inner = inner;
         this.metricScope = metricScope;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
@@ -88,7 +86,7 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {
-            inner.init(context, root);
+            super.init(context, root);
         } finally {
             metrics.recordLatency(
                 restoreTime,
@@ -112,7 +110,7 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
         Objects.requireNonNull(key, "key cannot be null");
         final Bytes bytesKey = keyBytes(key);
         return new MeteredWindowedKeyValueIterator<>(
-            inner.findSessions(
+            wrapped().findSessions(
                 bytesKey,
                 earliestSessionEndTime,
                 latestSessionStartTime),
@@ -132,7 +130,7 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
         final Bytes bytesKeyFrom = keyBytes(keyFrom);
         final Bytes bytesKeyTo = keyBytes(keyTo);
         return new MeteredWindowedKeyValueIterator<>(
-            inner.findSessions(
+            wrapped().findSessions(
                 bytesKeyFrom,
                 bytesKeyTo,
                 earliestSessionEndTime,
@@ -149,7 +147,7 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
         final long startNs = time.nanoseconds();
         try {
             final Bytes key = keyBytes(sessionKey.key());
-            inner.remove(new Windowed<>(key, sessionKey.window()));
+            wrapped().remove(new Windowed<>(key, sessionKey.window()));
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), 
sessionKey.key());
             throw new ProcessorStateException(message, e);
@@ -165,7 +163,7 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
         final long startNs = time.nanoseconds();
         try {
             final Bytes key = keyBytes(sessionKey.key());
-            inner.put(new Windowed<>(key, sessionKey.window()), 
serdes.rawValue(aggregate));
+            wrapped().put(new Windowed<>(key, sessionKey.window()), 
serdes.rawValue(aggregate));
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), 
sessionKey.key(), aggregate);
             throw new ProcessorStateException(message, e);
@@ -185,7 +183,7 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
         final Bytes bytesKey = keyBytes(key);
         final long startNs = time.nanoseconds();
         try {
-            value = serdes.valueFrom(inner.fetchSession(bytesKey, startTime, 
endTime));
+            value = serdes.valueFrom(wrapped().fetchSession(bytesKey, 
startTime, endTime));
         } finally {
             metrics.recordLatency(flushTime, startNs, time.nanoseconds());
         }
@@ -211,7 +209,7 @@ public class MeteredSessionStore<K, V> extends 
WrappedStateStore.AbstractStateSt
     public void flush() {
         final long startNs = time.nanoseconds();
         try {
-            inner.flush();
+            super.flush();
         } finally {
             metrics.recordLatency(flushTime, startNs, time.nanoseconds());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 166c300..2bbfc45 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -36,9 +36,8 @@ import java.util.Map;
 import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
 import static 
org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
 
-public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
+public class MeteredWindowStore<K, V> extends 
WrappedStateStore<WindowStore<Bytes, byte[]>> implements WindowStore<K, V> {
 
-    private final WindowStore<Bytes, byte[]> inner;
     private final String metricScope;
     private final Time time;
     private final Serde<K> keySerde;
@@ -57,7 +56,6 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde) {
         super(inner);
-        this.inner = inner;
         this.metricScope = metricScope;
         this.time = time;
         this.keySerde = keySerde;
@@ -88,7 +86,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {
-            inner.init(context, root);
+            super.init(context, root);
         } finally {
             metrics.recordLatency(
                 restoreTime,
@@ -116,7 +114,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
                     final long windowStartTimestamp) {
         final long startNs = time.nanoseconds();
         try {
-            inner.put(keyBytes(key), serdes.rawValue(value), 
windowStartTimestamp);
+            wrapped().put(keyBytes(key), serdes.rawValue(value), 
windowStartTimestamp);
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key, value);
             throw new ProcessorStateException(message, e);
@@ -134,7 +132,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
                    final long timestamp) {
         final long startNs = time.nanoseconds();
         try {
-            final byte[] result = inner.fetch(keyBytes(key), timestamp);
+            final byte[] result = wrapped().fetch(keyBytes(key), timestamp);
             if (result == null) {
                 return null;
             }
@@ -149,7 +147,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     public WindowStoreIterator<V> fetch(final K key,
                                         final long timeFrom,
                                         final long timeTo) {
-        return new MeteredWindowStoreIterator<>(inner.fetch(keyBytes(key), 
timeFrom, timeTo),
+        return new MeteredWindowStoreIterator<>(wrapped().fetch(keyBytes(key), 
timeFrom, timeTo),
                                                 fetchTime,
                                                 metrics,
                                                 serdes,
@@ -158,7 +156,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, 
metrics, serdes, time);
+        return new MeteredWindowedKeyValueIterator<>(wrapped().all(), 
fetchTime, metrics, serdes, time);
     }
 
     @SuppressWarnings("deprecation")
@@ -166,7 +164,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
                                                      final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
-            inner.fetchAll(timeFrom, timeTo),
+            wrapped().fetchAll(timeFrom, timeTo),
             fetchTime,
             metrics,
             serdes,
@@ -180,7 +178,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
                                                   final long timeFrom,
                                                   final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
-            inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
+            wrapped().fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
             fetchTime,
             metrics,
             serdes,
@@ -191,7 +189,7 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     public void flush() {
         final long startNs = time.nanoseconds();
         try {
-            inner.flush();
+            super.flush();
         } finally {
             metrics.recordLatency(flushTime, startNs, time.nanoseconds());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java
index 64733b2..9046e37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java
@@ -18,34 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
-import java.nio.ByteBuffer;
-
 public interface RecordConverter {
     ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], 
byte[]> record);
-
-    @SuppressWarnings("deprecation")
-    static RecordConverter converter() {
-        return record -> {
-            final byte[] rawValue = record.value();
-            final long timestamp = record.timestamp();
-            return new ConsumerRecord<>(
-                record.topic(),
-                record.partition(),
-                record.offset(),
-                timestamp,
-                record.timestampType(),
-                record.checksum(),
-                record.serializedKeySize(),
-                record.serializedValueSize(),
-                record.key(),
-                ByteBuffer
-                    .allocate(8 + rawValue.length)
-                    .putLong(timestamp)
-                    .put(rawValue)
-                    .array(),
-                record.headers(),
-                record.leaderEpoch()
-            );
-        };
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
new file mode 100644
index 0000000..f65cc32
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.nio.ByteBuffer;
+
+public final class RecordConverters {
+    private static final RecordConverter IDENTITY_INSTANCE = record -> record;
+
+    @SuppressWarnings("deprecation")
+    private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record 
-> {
+        final byte[] rawValue = record.value();
+        final long timestamp = record.timestamp();
+        return new ConsumerRecord<>(
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            timestamp,
+            record.timestampType(),
+            record.checksum(),
+            record.serializedKeySize(),
+            record.serializedValueSize(),
+            record.key(),
+            ByteBuffer
+                .allocate(8 + rawValue.length)
+                .putLong(timestamp)
+                .put(rawValue)
+                .array(),
+            record.headers(),
+            record.leaderEpoch()
+        );
+    };
+
+    // privatize the constructor so the class cannot be instantiated (only 
used for its static members)
+    private RecordConverters() {}
+
+    public static RecordConverter rawValueToTimestampedValue() {
+        return RAW_TO_TIMESTAMED_INSTANCE;
+    }
+
+    public static RecordConverter identity() {
+        return IDENTITY_INSTANCE;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index be423bc..d855442 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -27,11 +27,10 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
 
-public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
+public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore<SegmentedBytesStore> implements SessionStore<K, AGG> {
 
     private final Serde<K> keySerde;
     private final Serde<AGG> aggSerde;
-    private final SegmentedBytesStore bytesStore;
 
     private StateSerdes<K, AGG> serdes;
     private String topic;
@@ -41,14 +40,13 @@ public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore.AbstractState
                         final Serde<AGG> aggSerde) {
         super(bytesStore);
         this.keySerde = keySerde;
-        this.bytesStore = bytesStore;
         this.aggSerde = aggSerde;
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
-        final String storeName = bytesStore.name();
+        final String storeName = name();
         topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
 
         serdes = new StateSerdes<>(
@@ -56,12 +54,12 @@ public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore.AbstractState
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
-        bytesStore.init(context, root);
+        super.init(context, root);
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final 
long earliestSessionEndTime, final long latestSessionStartTime) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(
             Bytes.wrap(serdes.rawKey(key)),
             earliestSessionEndTime,
             latestSessionStartTime
@@ -71,7 +69,7 @@ public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore.AbstractState
 
     @Override
     public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, 
final K keyTo, final long earliestSessionEndTime, final long 
latestSessionStartTime) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(
             Bytes.wrap(serdes.rawKey(keyFrom)),
             Bytes.wrap(serdes.rawKey(keyTo)),
             earliestSessionEndTime,
@@ -82,7 +80,7 @@ public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore.AbstractState
 
     @Override
     public AGG fetchSession(final K key, final long startTime, final long 
endTime) {
-        return 
serdes.valueFrom(bytesStore.get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)),
 startTime, endTime)));
+        return 
serdes.valueFrom(wrapped().get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)),
 startTime, endTime)));
     }
 
     @Override
@@ -97,11 +95,11 @@ public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore.AbstractState
 
     @Override
     public void remove(final Windowed<K> key) {
-        bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, 
serdes.keySerializer(), topic)));
+        wrapped().remove(Bytes.wrap(SessionKeySchema.toBinary(key, 
serdes.keySerializer(), topic)));
     }
 
     @Override
     public void put(final Windowed<K> sessionKey, final AGG aggregate) {
-        bytesStore.put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, 
serdes.keySerializer(), topic)), serdes.rawValue(aggregate));
+        wrapped().put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, 
serdes.keySerializer(), topic)), serdes.rawValue(aggregate));
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index bb13c74..c22ca52 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -27,13 +27,12 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
+public class RocksDBWindowStore<K, V> extends 
WrappedStateStore<SegmentedBytesStore> implements WindowStore<K, V> {
 
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final boolean retainDuplicates;
     private final long windowSize;
-    private final SegmentedBytesStore bytesStore;
 
     private ProcessorContext context;
     private StateSerdes<K, V> serdes;
@@ -47,7 +46,6 @@ public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
         super(bytesStore);
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
-        this.bytesStore = bytesStore;
         this.retainDuplicates = retainDuplicates;
         this.windowSize = windowSize;
     }
@@ -57,11 +55,11 @@ public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
         // construct the serde
-        serdes = new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 bytesStore.name()),
+        serdes = new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 name()),
                                    keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
                                    valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
 
-        bytesStore.init(context, root);
+        super.init(context, root);
     }
 
     @Override
@@ -73,12 +71,12 @@ public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     public void put(final K key, final V value, final long 
windowStartTimestamp) {
         maybeUpdateSeqnumForDups();
 
-        bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, seqnum, serdes), serdes.rawValue(value));
+        wrapped().put(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, seqnum, serdes), serdes.rawValue(value));
     }
 
     @Override
     public V fetch(final K key, final long timestamp) {
-        final byte[] bytesValue = 
bytesStore.get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, 
serdes));
+        final byte[] bytesValue = 
wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes));
         if (bytesValue == null) {
             return null;
         }
@@ -88,27 +86,27 @@ public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     @SuppressWarnings("deprecation")
     @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, 
final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).valuesIterator();
     }
 
     @SuppressWarnings("deprecation")
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final long timeFrom, final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), 
Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), 
timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().all();
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
     }
 
     @SuppressWarnings("deprecation")
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, 
final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
bytesStore.fetchAll(timeFrom, timeTo);
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetchAll(timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 2bb60bc..65cd484 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -19,81 +19,66 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
 
 /**
  * A storage engine wrapper for utilities like logging, caching, and metering.
  */
-public interface WrappedStateStore extends StateStore {
-
-    /**
-     * Return the inner most storage engine
-     *
-     * @return wrapped inner storage engine
-     */
-    StateStore inner();
-
-    /**
-     * Return the state store this store directly wraps
-     * @return the state store this store directly wraps
-     */
-    StateStore wrappedStore();
-
-    abstract class AbstractStateStore implements WrappedStateStore {
-        final StateStore innerState;
-
-        protected AbstractStateStore(final StateStore inner) {
-            this.innerState = inner;
+public abstract class WrappedStateStore<S extends StateStore> implements 
StateStore {
+    public static boolean isTimestamped(final StateStore stateStore) {
+        if (stateStore instanceof TimestampedBytesStore) {
+            return true;
+        } else if (stateStore instanceof WrappedStateStore) {
+            return isTimestamped(((WrappedStateStore) stateStore).wrapped());
+        } else {
+            return false;
         }
+    }
 
-        @Override
-        public void init(final ProcessorContext context,
-                         final StateStore root) {
-            innerState.init(context, root);
-        }
+    private final S wrapped;
 
-        @Override
-        public String name() {
-            return innerState.name();
-        }
+    public WrappedStateStore(final S wrapped) {
+        this.wrapped = wrapped;
+    }
 
-        @Override
-        public boolean persistent() {
-            return innerState.persistent();
-        }
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        wrapped.init(context, root);
+    }
 
-        @Override
-        public boolean isOpen() {
-            return innerState.isOpen();
-        }
+    @Override
+    public String name() {
+        return wrapped.name();
+    }
 
-        void validateStoreOpen() {
-            if (!innerState.isOpen()) {
-                throw new InvalidStateStoreException("Store " + 
innerState.name() + " is currently closed.");
-            }
-        }
+    @Override
+    public boolean persistent() {
+        return wrapped.persistent();
+    }
 
-        @Override
-        public StateStore inner() {
-            if (innerState instanceof WrappedStateStore) {
-                return ((WrappedStateStore) innerState).inner();
-            }
-            return innerState;
-        }
+    @Override
+    public boolean isOpen() {
+        return wrapped.isOpen();
+    }
 
-        @Override
-        public void flush() {
-            innerState.flush();
+    void validateStoreOpen() {
+        if (!wrapped.isOpen()) {
+            throw new InvalidStateStoreException("Store " + wrapped.name() + " 
is currently closed.");
         }
+    }
 
-        @Override
-        public void close() {
-            innerState.close();
-        }
+    @Override
+    public void flush() {
+        wrapped.flush();
+    }
 
-        @Override
-        public StateStore wrappedStore() {
-            return innerState;
-        }
+    @Override
+    public void close() {
+        wrapped.close();
+    }
 
+    public S wrapped() {
+        return wrapped;
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 698a0ab..6b8b5b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -692,7 +692,7 @@ public class KafkaStreamsTest {
                 .addSink("sink", outputTopic, new StringSerializer(), new 
StringSerializer(), "process");
 
         final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = 
Stores.keyValueStoreBuilder(
-                isPersistentStore ? 
Stores.persistentKeyValueStore(globalStoreName) : 
Stores.inMemoryKeyValueStore(globalStoreName), 
+                isPersistentStore ? 
Stores.persistentKeyValueStore(globalStoreName) : 
Stores.inMemoryKeyValueStore(globalStoreName),
                 Serdes.String(), Serdes.String()).withLoggingDisabled();
         topology.addGlobalStore(globalStoreBuilder,
                 "global",
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 0e41794..c8879a6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -31,9 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
@@ -240,47 +238,11 @@ public class GlobalStateManagerImplTest {
         initializeConsumer(1, 0, t1);
 
         stateManager.initialize();
-        stateManager.register(new WrappedStateStore() {
-            @Override
-            public StateStore inner() {
-                return store1;
-            }
-
-            @Override
-            public StateStore wrappedStore() {
-                return store1;
-            }
-
-            @Override
-            public String name() {
-                return store1.name();
-            }
-
-            @Override
-            public void init(final ProcessorContext context, final StateStore 
root) {
-                store1.init(context, root);
-            }
-
-            @Override
-            public void flush() {
-                store1.flush();
-            }
-
-            @Override
-            public void close() {
-                store1.close();
-            }
-
-            @Override
-            public boolean persistent() {
-                return store1.persistent();
-            }
-
-            @Override
-            public boolean isOpen() {
-                return store1.isOpen();
-            }
-        }, stateRestoreCallback);
+        stateManager.register(
+            new WrappedStateStore<NoOpReadOnlyStore<Object, Object>>(store1) {
+            },
+            stateRestoreCallback
+        );
 
         final KeyValue<byte[], byte[]> restoredRecord = 
stateRestoreCallback.restored.get(0);
         assertEquals(3, restoredRecord.key.length);
@@ -304,47 +266,11 @@ public class GlobalStateManagerImplTest {
         initializeConsumer(1, 0, t2);
 
         stateManager.initialize();
-        stateManager.register(new WrappedStateStore() {
-            @Override
-            public StateStore inner() {
-                return store2;
-            }
-
-            @Override
-            public StateStore wrappedStore() {
-                return store2;
-            }
-
-            @Override
-            public String name() {
-                return store2.name();
-            }
-
-            @Override
-            public void init(final ProcessorContext context, final StateStore 
root) {
-                store2.init(context, root);
-            }
-
-            @Override
-            public void flush() {
-                store2.flush();
-            }
-
-            @Override
-            public void close() {
-                store2.close();
-            }
-
-            @Override
-            public boolean persistent() {
-                return store2.persistent();
-            }
-
-            @Override
-            public boolean isOpen() {
-                return store2.isOpen();
-            }
-        }, stateRestoreCallback);
+        stateManager.register(
+            new WrappedStateStore<NoOpReadOnlyStore<Object, Object>>(store2) {
+            },
+            stateRestoreCallback
+        );
 
         final KeyValue<byte[], byte[]> restoredRecord = 
stateRestoreCallback.restored.get(0);
         assertEquals(3, restoredRecord.key.length);
@@ -402,7 +328,7 @@ public class GlobalStateManagerImplTest {
         offsetCheckpoint.write(Collections.singletonMap(t1, 5L));
 
         stateManager.initialize();
-        stateManager.register(store1,  stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         assertEquals(5, stateRestoreCallback.restored.size());
     }
 
@@ -729,15 +655,15 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws 
IOException {
         final File storeDirectory1 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
-            + File.separator + "rocksdb"
-            + File.separator + storeName1);
+                                                  + File.separator + "rocksdb"
+                                                  + File.separator + 
storeName1);
         final File storeDirectory2 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
-            + File.separator + "rocksdb"
-            + File.separator + storeName2);
+                                                  + File.separator + "rocksdb"
+                                                  + File.separator + 
storeName2);
         final File storeDirectory3 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
-            + File.separator + storeName3);
+                                                  + File.separator + 
storeName3);
         final File storeDirectory4 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
-            + File.separator + storeName4);
+                                                  + File.separator + 
storeName4);
         final File testFile1 = new File(storeDirectory1.getAbsolutePath() + 
File.separator + "testFile");
         final File testFile2 = new File(storeDirectory2.getAbsolutePath() + 
File.separator + "testFile");
         final File testFile3 = new File(storeDirectory3.getAbsolutePath() + 
File.separator + "testFile");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 494ae02..bb1dec2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -59,8 +59,8 @@ public class KeyValueStoreMaterializerTest {
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
-        final StateStore logging = caching.wrappedStore();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final StateStore logging = caching.wrapped();
         assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(caching, instanceOf(CachedStateStore.class));
         assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
@@ -74,7 +74,7 @@ public class KeyValueStoreMaterializerTest {
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
     }
 
@@ -86,9 +86,9 @@ public class KeyValueStoreMaterializerTest {
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertThat(caching, instanceOf(CachedStateStore.class));
-        assertThat(caching.wrappedStore(), 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+        assertThat(caching.wrapped(), 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
     }
 
     @Test
@@ -99,7 +99,7 @@ public class KeyValueStoreMaterializerTest {
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
-        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(wrapped, not(instanceOf(CachedStateStore.class)));
         assertThat(wrapped, 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
     }
@@ -117,9 +117,8 @@ public class KeyValueStoreMaterializerTest {
         final KeyValueStoreMaterializer<String, Integer> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, Integer>> builder = 
materializer.materialize();
         final KeyValueStore<String, Integer> built = builder.build();
-        final StateStore inner = ((WrappedStateStore) built).inner();
 
-        assertThat(inner, CoreMatchers.equalTo(store));
+        assertThat(store.name(), CoreMatchers.equalTo(built.name()));
     }
 
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 8ed23a6..4622c01 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.identity;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -42,7 +43,7 @@ public class StateRestorerTest {
         OFFSET_LIMIT,
         true,
         "storeName",
-        record -> record);
+        identity());
 
     @Before
     public void setUp() {
@@ -79,7 +80,7 @@ public class StateRestorerTest {
             0,
             true,
             "storeName",
-            record -> record);
+            identity());
         assertTrue(restorer.hasCompleted(0, 10));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ed82421..b13ed53 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -48,6 +48,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.identity;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
@@ -107,7 +108,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
     }
@@ -145,7 +146,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -170,7 +171,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         EasyMock.replay(active, task);
@@ -185,7 +186,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
         // retry restore should succeed
         assertEquals(1, changelogReader.restore(active).size());
         assertThat(callback.restored.size(), equalTo(messages));
@@ -210,7 +211,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record);
+            identity());
         changelogReader.register(stateRestorer);
 
         
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@@ -240,7 +241,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(5));
@@ -257,7 +258,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -274,7 +275,7 @@ public class StoreChangelogReaderTest {
             3,
             true,
             "storeName",
-            record -> record);
+            identity());
         changelogReader.register(restorer);
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
@@ -302,7 +303,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName1",
-            record -> record));
+            identity()));
         changelogReader.register(new StateRestorer(
             one,
             restoreListener1,
@@ -310,7 +311,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName2",
-            record -> record));
+            identity()));
         changelogReader.register(new StateRestorer(
             two,
             restoreListener2,
@@ -318,7 +319,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName3",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(one)).andStubReturn(task);
         expect(active.restoringTaskFor(two)).andStubReturn(task);
@@ -350,7 +351,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName1",
-            record -> record));
+            identity()));
         changelogReader.register(new StateRestorer(
             one,
             restoreListener1,
@@ -358,7 +359,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName2",
-            record -> record));
+            identity()));
         changelogReader.register(new StateRestorer(
             two,
             restoreListener2,
@@ -366,7 +367,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName3",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(one)).andReturn(task);
         expect(active.restoringTaskFor(two)).andReturn(task);
@@ -401,7 +402,7 @@ public class StoreChangelogReaderTest {
             5,
             true,
             "storeName1",
-            record -> record));
+            identity()));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -437,7 +438,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record);
+            identity());
         setupConsumer(0, topicPartition);
         changelogReader.register(restorer);
 
@@ -457,7 +458,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record);
+            identity());
 
         changelogReader.register(restorer);
 
@@ -476,7 +477,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -495,7 +496,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             false,
             "storeName",
-            record -> record));
+            identity()));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -518,7 +519,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             false,
             "storeName",
-            record -> record));
+            identity()));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -537,7 +538,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "store",
-            record -> record));
+            identity()));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
 
@@ -559,7 +560,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             false,
             "storeName",
-            record -> record));
+            identity()));
 
         final TopicPartition postInitialization = new TopicPartition("other", 
0);
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@@ -581,7 +582,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             false,
             "otherStore",
-            record -> record));
+            identity()));
 
         final Collection<TopicPartition> expected = 
Utils.mkSet(topicPartition, postInitialization);
         consumer.assign(expected);
@@ -605,7 +606,7 @@ public class StoreChangelogReaderTest {
             9L,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -627,7 +628,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -648,7 +649,7 @@ public class StoreChangelogReaderTest {
             Long.MAX_VALUE,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -668,7 +669,7 @@ public class StoreChangelogReaderTest {
             5,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -689,7 +690,7 @@ public class StoreChangelogReaderTest {
             10,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -717,7 +718,7 @@ public class StoreChangelogReaderTest {
             6,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -740,7 +741,7 @@ public class StoreChangelogReaderTest {
             11,
             true,
             "storeName",
-            record -> record));
+            identity()));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 12dffba..c46498c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -51,7 +51,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@@ -96,7 +95,7 @@ public class CachingKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         final KeyValueStore<K, V> store = (KeyValueStore<K, V>) 
storeBuilder.build();
         final CacheFlushListenerStub<K, V> cacheFlushListener = new 
CacheFlushListenerStub<>();
 
-        final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) 
store).wrappedStore();
+        final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) 
store).wrapped();
         inner.setFlushListener(cacheFlushListener, false);
         store.init(context, store);
         return store;
@@ -358,7 +357,7 @@ public class CachingKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
 
     @Test
     public void shouldReturnUnderlying() {
-        assertTrue(store.underlying().equals(underlyingStore));
+        assertEquals(store.wrapped(), underlyingStore);
     }
 
     @Test(expected = InvalidStateStoreException.class)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
index 848bba6..8642cfa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
@@ -69,21 +69,21 @@ public class KeyValueStoreBuilderTest {
     public void shouldHaveChangeLoggingStoreByDefault() {
         final KeyValueStore<String, String> store = builder.build();
         assertThat(store, instanceOf(MeteredKeyValueStore.class));
-        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
         assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class));
     }
 
     @Test
     public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
         final KeyValueStore<String, String> store = 
builder.withLoggingDisabled().build();
-        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
         assertThat(next, CoreMatchers.equalTo(inner));
     }
 
     @Test
     public void shouldHaveCachingStoreWhenEnabled() {
         final KeyValueStore<String, String> store = 
builder.withCachingEnabled().build();
-        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
     }
@@ -93,10 +93,10 @@ public class KeyValueStoreBuilderTest {
         final KeyValueStore<String, String> store = builder
                 .withLoggingEnabled(Collections.emptyMap())
                 .build();
-        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class));
-        assertThat(((WrappedStateStore) wrapped).wrappedStore(), 
CoreMatchers.equalTo(inner));
+        assertThat(((WrappedStateStore) wrapped).wrapped(), 
CoreMatchers.equalTo(inner));
     }
 
     @Test
@@ -105,12 +105,12 @@ public class KeyValueStoreBuilderTest {
                 .withLoggingEnabled(Collections.emptyMap())
                 .withCachingEnabled()
                 .build();
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
-        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrappedStore();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrapped();
         assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(caching, instanceOf(CachingKeyValueStore.class));
         assertThat(changeLogging, 
instanceOf(ChangeLoggingKeyValueBytesStore.class));
-        assertThat(changeLogging.wrappedStore(), CoreMatchers.equalTo(inner));
+        assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
     }
 
     @SuppressWarnings("all")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
index 621a1c2..eb0cc55 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
@@ -69,21 +69,21 @@ public class SessionStoreBuilderTest {
     @Test
     public void shouldHaveChangeLoggingStoreByDefault() {
         final SessionStore<String, String> store = builder.build();
-        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
         assertThat(next, instanceOf(ChangeLoggingSessionBytesStore.class));
     }
 
     @Test
     public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
         final SessionStore<String, String> store = 
builder.withLoggingDisabled().build();
-        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
         assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
     }
 
     @Test
     public void shouldHaveCachingStoreWhenEnabled() {
         final SessionStore<String, String> store = 
builder.withCachingEnabled().build();
-        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, instanceOf(MeteredSessionStore.class));
         assertThat(wrapped, instanceOf(CachingSessionStore.class));
     }
@@ -93,10 +93,10 @@ public class SessionStoreBuilderTest {
         final SessionStore<String, String> store = builder
                 .withLoggingEnabled(Collections.<String, String>emptyMap())
                 .build();
-        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, instanceOf(MeteredSessionStore.class));
         assertThat(wrapped, instanceOf(ChangeLoggingSessionBytesStore.class));
-        assertThat(((WrappedStateStore) wrapped).wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(((WrappedStateStore) wrapped).wrapped(), 
CoreMatchers.<StateStore>equalTo(inner));
     }
 
     @Test
@@ -105,12 +105,12 @@ public class SessionStoreBuilderTest {
                 .withLoggingEnabled(Collections.<String, String>emptyMap())
                 .withCachingEnabled()
                 .build();
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
-        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrappedStore();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrapped();
         assertThat(store, instanceOf(MeteredSessionStore.class));
         assertThat(caching, instanceOf(CachingSessionStore.class));
         assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
-        assertThat(changeLogging.wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(changeLogging.wrapped(), 
CoreMatchers.<StateStore>equalTo(inner));
     }
 
     @Test(expected = NullPointerException.class)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
index 25b8178..022f6dd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -68,21 +68,21 @@ public class WindowStoreBuilderTest {
     @Test
     public void shouldHaveChangeLoggingStoreByDefault() {
         final WindowStore<String, String> store = builder.build();
-        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
         assertThat(next, instanceOf(ChangeLoggingWindowBytesStore.class));
     }
 
     @Test
     public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
         final WindowStore<String, String> store = 
builder.withLoggingDisabled().build();
-        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
         assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
     }
 
     @Test
     public void shouldHaveCachingStoreWhenEnabled() {
         final WindowStore<String, String> store = 
builder.withCachingEnabled().build();
-        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, instanceOf(MeteredWindowStore.class));
         assertThat(wrapped, instanceOf(CachingWindowStore.class));
     }
@@ -92,10 +92,10 @@ public class WindowStoreBuilderTest {
         final WindowStore<String, String> store = builder
                 .withLoggingEnabled(Collections.<String, String>emptyMap())
                 .build();
-        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, instanceOf(MeteredWindowStore.class));
         assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
-        assertThat(((WrappedStateStore) wrapped).wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(((WrappedStateStore) wrapped).wrapped(), 
CoreMatchers.<StateStore>equalTo(inner));
     }
 
     @Test
@@ -104,12 +104,12 @@ public class WindowStoreBuilderTest {
                 .withLoggingEnabled(Collections.<String, String>emptyMap())
                 .withCachingEnabled()
                 .build();
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
-        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrappedStore();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrapped();
         assertThat(store, instanceOf(MeteredWindowStore.class));
         assertThat(caching, instanceOf(CachingWindowStore.class));
         assertThat(changeLogging, 
instanceOf(ChangeLoggingWindowBytesStore.class));
-        assertThat(changeLogging.wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(changeLogging.wrapped(), 
CoreMatchers.<StateStore>equalTo(inner));
     }
 
     @Test(expected = NullPointerException.class)

Reply via email to