http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
new file mode 100644
index 0000000..e7c59ef
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class CachingKeyValueStoreTest {
+
+    private CachingKeyValueStore<String, String> store;
+    private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
+    private ThreadCache cache;
+    private int maxCacheSizeBytes;
+    private CacheFlushListenerStub<String> cacheFlushListener;
+    private String topic;
+
+    @Before
+    public void setUp() throws Exception {
+        final String storeName = "store";
+        underlyingStore = new InMemoryKeyValueStore<>(storeName);
+        cacheFlushListener = new CacheFlushListenerStub<>();
+        store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), 
Serdes.String());
+        store.setFlushListener(cacheFlushListener);
+        maxCacheSizeBytes = 150;
+        cache = new ThreadCache(maxCacheSizeBytes);
+        final MockProcessorContext context = new MockProcessorContext(null, 
null, null, null, (RecordCollector) null, cache);
+        topic = "topic";
+        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
+        store.init(context, null);
+    }
+
+    @Test
+    public void shouldPutGetToFromCache() throws Exception {
+        store.put("key", "value");
+        store.put("key2", "value2");
+        assertEquals("value", store.get("key"));
+        assertEquals("value2", store.get("key2"));
+        // nothing evicted so underlying store should be empty
+        assertEquals(2, cache.size());
+        assertEquals(0, underlyingStore.approximateNumEntries());
+    }
+
+    @Test
+    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
+        int added = addItemsToCache();
+        // all dirty entries should have been flushed
+        assertEquals(added, underlyingStore.approximateNumEntries());
+        assertEquals(added, store.approximateNumEntries());
+        assertNotNull(underlyingStore.get(Bytes.wrap("0".getBytes())));
+    }
+
+    @Test
+    public void shouldForwardDirtyItemToListenerWhenEvicted() throws Exception 
{
+        int numRecords = addItemsToCache();
+        assertEquals(numRecords, cacheFlushListener.forwarded.size());
+    }
+
+    @Test
+    public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
+        store.put("1", "a");
+        store.flush();
+        assertEquals("a", cacheFlushListener.forwarded.get("1").newValue);
+        assertNull(cacheFlushListener.forwarded.get("1").oldValue);
+    }
+
+    @Test
+    public void shouldForwardOldValuesWhenEnabled() throws Exception {
+        store.put("1", "a");
+        store.flush();
+        store.put("1", "b");
+        store.flush();
+        assertEquals("b", cacheFlushListener.forwarded.get("1").newValue);
+        assertEquals("a", cacheFlushListener.forwarded.get("1").oldValue);
+    }
+
+    @Test
+    public void shouldIterateAllStoredItems() throws Exception {
+        int items = addItemsToCache();
+        final KeyValueIterator<String, String> all = store.all();
+        final List<String> results = new ArrayList<>();
+        while (all.hasNext()) {
+            results.add(all.next().key);
+        }
+        assertEquals(items, results.size());
+    }
+
+    @Test
+    public void shouldIterateOverRange() throws Exception {
+        int items = addItemsToCache();
+        final KeyValueIterator<String, String> range = 
store.range(String.valueOf(0), String.valueOf(items));
+        final List<String> results = new ArrayList<>();
+        while (range.hasNext()) {
+            results.add(range.next().key);
+        }
+        assertEquals(items, results.size());
+    }
+
+    private int addItemsToCache() throws IOException {
+        int cachedSize = 0;
+        int i = 0;
+        while (cachedSize < maxCacheSizeBytes) {
+            final String kv = String.valueOf(i++);
+            store.put(kv, kv);
+            cachedSize += memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), 
topic);
+        }
+        return i;
+    }
+
+    public static class CacheFlushListenerStub<K> implements 
CacheFlushListener<K, String> {
+        public final Map<K, Change<String>> forwarded = new HashMap<>();
+
+        @Override
+        public void apply(final K key, final String newValue, final String 
oldValue) {
+            forwarded.put(key, new Change<>(newValue, oldValue));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
new file mode 100644
index 0000000..49e2db3
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static 
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+
+public class CachingWindowStoreTest {
+
+    private static final int MAX_CACHE_SIZE_BYTES = 150;
+    private static final Long WINDOW_SIZE = 10000L;
+    private RocksDBWindowStore<Bytes, byte[]> underlying;
+    private CachingWindowStore<String, String> cachingStore;
+    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> 
cacheListener;
+    private ThreadCache cache;
+    private String topic;
+    private static final Long DEFAULT_TIMESTAMP = 10L;
+
+    @Before
+    public void setUp() throws Exception {
+        underlying = new RocksDBWindowStore<>("test", 30000, 3, false, 
Serdes.Bytes(), Serdes.ByteArray());
+        cacheListener = new 
CachingKeyValueStoreTest.CacheFlushListenerStub<>();
+        cachingStore = new CachingWindowStore<>(underlying,
+                                                Serdes.String(),
+                                                Serdes.String(),
+                                                WINDOW_SIZE);
+        cachingStore.setFlushListener(cacheListener);
+        cache = new ThreadCache(MAX_CACHE_SIZE_BYTES);
+        topic = "topic";
+        final MockProcessorContext context = new MockProcessorContext(null, 
TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, topic));
+        cachingStore.init(context, cachingStore);
+    }
+
+    @Test
+    public void shouldPutFetchFromCache() throws Exception {
+        cachingStore.put("a", "a");
+        cachingStore.put("b", "b");
+
+        final WindowStoreIterator<String> a = cachingStore.fetch("a", 10, 10);
+        final WindowStoreIterator<String> b = cachingStore.fetch("b", 10, 10);
+        assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), a.next());
+        assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "b"), b.next());
+        assertFalse(a.hasNext());
+        assertFalse(b.hasNext());
+        assertEquals(2, cache.size());
+    }
+
+    @Test
+    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
+        int added = addItemsToCache();
+        // all dirty entries should have been flushed
+        final WindowStoreIterator<byte[]> iter = 
underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP);
+        final KeyValue<Long, byte[]> next = iter.next();
+        assertEquals(DEFAULT_TIMESTAMP, next.key);
+        assertArrayEquals("0".getBytes(), next.value);
+        assertFalse(iter.hasNext());
+        assertEquals(added - 1, cache.size());
+    }
+
+    @Test
+    public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
+        final Windowed<String> windowedKey = new Windowed<>("1", new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        cachingStore.put("1", "a");
+        cachingStore.flush();
+        assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+    }
+
+    @Test
+    public void shouldForwardOldValuesWhenEnabled() throws Exception {
+        final Windowed<String> windowedKey = new Windowed<>("1", new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        cachingStore.put("1", "a");
+        cachingStore.flush();
+        cachingStore.put("1", "b");
+        cachingStore.flush();
+        assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
+        assertEquals("a", cacheListener.forwarded.get(windowedKey).oldValue);
+    }
+
+    @Test
+    public void shouldForwardDirtyItemToListenerWhenEvicted() throws Exception 
{
+        int numRecords = addItemsToCache();
+        assertEquals(numRecords, cacheListener.forwarded.size());
+    }
+
+    @Test
+    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() throws 
Exception {
+        cachingStore.put("1", "a", DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        cachingStore.put("1", "b", DEFAULT_TIMESTAMP);
+
+        final WindowStoreIterator<String> fetch = cachingStore.fetch("1", 
DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
+        assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "b"), fetch.next());
+        assertFalse(fetch.hasNext());
+    }
+
+    @Test
+    public void shouldIterateAcrossWindows() throws Exception {
+        cachingStore.put("1", "a", DEFAULT_TIMESTAMP);
+        cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
+
+        final WindowStoreIterator<String> fetch = cachingStore.fetch("1", 
DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
+        assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());
+        assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP + WINDOW_SIZE, "b"), 
fetch.next());
+        assertFalse(fetch.hasNext());
+    }
+
+    @Test
+    public void shouldIterateCacheAndStore() throws Exception {
+        underlying.put(Bytes.wrap("1".getBytes()), "a".getBytes());
+        cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
+        final WindowStoreIterator<String> fetch = cachingStore.fetch("1", 
DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
+        assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());
+        assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP + WINDOW_SIZE, "b"), 
fetch.next());
+        assertFalse(fetch.hasNext());
+    }
+
+    private int addItemsToCache() throws IOException {
+        int cachedSize = 0;
+        int i = 0;
+        while (cachedSize < MAX_CACHE_SIZE_BYTES) {
+            final String kv = String.valueOf(i++);
+            cachingStore.put(kv, kv);
+            cachedSize += memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), 
topic) +
+                8 + // timestamp
+                4; // sequenceNumber
+        }
+        return i;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
new file mode 100644
index 0000000..4a32187
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DelegatingPeekingKeyValueIteratorTest {
+
+    private InMemoryKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() throws Exception {
+        store = new InMemoryKeyValueStore<>("name");
+    }
+
+    @Test
+    public void shouldPeekNext() throws Exception {
+        store.put("A", "A");
+        final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        assertEquals("A", peekingIterator.peekNextKey());
+        assertEquals("A", peekingIterator.peekNextKey());
+        assertTrue(peekingIterator.hasNext());
+    }
+
+    @Test
+    public void shouldPeekAndIterate() throws Exception {
+        final String[] kvs = {"a", "b", "c", "d", "e", "f"};
+        for (String kv : kvs) {
+            store.put(kv, kv);
+        }
+
+        final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        int index = 0;
+        while (peekingIterator.hasNext()) {
+            final String peekNext = peekingIterator.peekNextKey();
+            final String key = peekingIterator.next().key;
+            assertEquals(kvs[index], peekNext);
+            assertEquals(kvs[index], key);
+            index++;
+        }
+        assertEquals(kvs.length, index);
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() 
throws Exception {
+        final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        peekingIterator.next();
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() 
throws Exception {
+        final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        peekingIterator.peekNextKey();
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java
new file mode 100644
index 0000000..3f251b3
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DelegatingPeekingWindowIteratorTest {
+
+    private static final long DEFAULT_TIMESTAMP = 0L;
+    private WindowStore<String, String> store;
+
+    @Before
+    public void setUp() throws Exception {
+        store = new RocksDBWindowStore<>("test", 30000, 3, false, 
Serdes.String(), Serdes.String());
+        final MockProcessorContext context = new MockProcessorContext(null, 
TestUtils.tempDirectory(), null, null, (RecordCollector) null, null);
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, "topic"));
+        store.init(context, store);
+    }
+
+    @Test
+    public void shouldPeekNext() throws Exception {
+        final KeyValue<Long, String> expected = 
KeyValue.pair(DEFAULT_TIMESTAMP, "A");
+        store.put("A", "A");
+        final DelegatingPeekingWindowIterator<String> peekingIterator = new 
DelegatingPeekingWindowIterator<>(store.fetch("A", DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP));
+        assertEquals(expected, peekingIterator.peekNext());
+        assertEquals(expected, peekingIterator.peekNext());
+        assertTrue(peekingIterator.hasNext());
+    }
+
+    @Test
+    public void shouldPeekAndIterate() throws Exception {
+        final List<KeyValue<Long, String>> expected = new ArrayList<>();
+        for (long t = 0; t < 50; t += 10) {
+            store.put("a", String.valueOf(t), t);
+            expected.add(KeyValue.pair(t, String.valueOf(t)));
+        }
+        final DelegatingPeekingWindowIterator<String> peekingIterator = new 
DelegatingPeekingWindowIterator<>(store.fetch("a", 0, 50));
+        int index = 0;
+        while (peekingIterator.hasNext()) {
+            final KeyValue<Long, String> peekNext = peekingIterator.peekNext();
+            final KeyValue<Long, String> key = peekingIterator.next();
+            assertEquals(expected.get(index), peekNext);
+            assertEquals(expected.get(index), key);
+            index++;
+        }
+        assertEquals(expected.size(), index);
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() 
throws Exception {
+        final DelegatingPeekingWindowIterator<String> peekingIterator = new 
DelegatingPeekingWindowIterator<>(store.fetch("b", 10, 10));
+        peekingIterator.next();
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() 
throws Exception {
+        final DelegatingPeekingWindowIterator<String> peekingIterator = new 
DelegatingPeekingWindowIterator<>(store.fetch("b", 10, 10));
+        peekingIterator.peekNext();
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
new file mode 100644
index 0000000..59607ea
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class MergedSortedCacheKeyValueStoreIteratorTest {
+
+    @Test
+    public void shouldIterateOverRange() throws Exception {
+        KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one");
+        final ThreadCache cache = new ThreadCache(1000000L);
+        byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, 
{10}};
+        final String namespace = "one";
+        for (int i = 0; i < bytes.length - 1; i += 2) {
+            kv.put(Bytes.wrap(bytes[i]), bytes[i]);
+            cache.put(namespace, bytes[i + 1], new LRUCacheEntry(bytes[i + 
1]));
+        }
+
+        final Bytes from = Bytes.wrap(new byte[]{2});
+        final Bytes to = Bytes.wrap(new byte[]{9});
+        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>(kv.range(from, to));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(namespace, from.get(), to.get());
+
+        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator 
= new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, 
new StateSerdes<>("name", Serdes.ByteArray(), Serdes.ByteArray()));
+        byte[][] values = new byte[8][];
+        int index = 0;
+        int bytesIndex = 2;
+        while (iterator.hasNext()) {
+            final byte[] value = iterator.next().value;
+            values[index++] = value;
+            assertArrayEquals(bytes[bytesIndex++], value);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
new file mode 100644
index 0000000..9ee8b29
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class MergedSortedCacheWindowStoreIteratorTest {
+
+    @Test
+    public void shouldIterateOverValueFromBothIterators() throws Exception {
+        final List<KeyValue<Long, byte[]>> storeValues = new ArrayList<>();
+        final ThreadCache cache = new ThreadCache(1000000L);
+        final String namespace = "one";
+        final StateSerdes<String, String> stateSerdes = new 
StateSerdes<>("foo", Serdes.String(), Serdes.String());
+        final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>();
+
+        for (long t = 0; t < 100; t += 20) {
+            final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, 
String.valueOf(t).getBytes());
+            storeValues.add(v1);
+            expectedKvPairs.add(v1);
+            final byte[] keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 
0, stateSerdes);
+            final byte[] valBytes = String.valueOf(t + 10).getBytes();
+            expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
+            cache.put(namespace, keyBytes, new LRUCacheEntry(valBytes));
+        }
+
+        byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, 
stateSerdes);
+        byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, 
stateSerdes);
+        final PeekingWindowIterator<byte[]> storeIterator = new 
DelegatingPeekingWindowIterator<>(new 
WindowStoreIteratorStub(storeValues.iterator()));
+
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(namespace, binaryFrom, binaryTo);
+
+        final MergedSortedCachedWindowStoreIterator<byte[], byte[]> iterator = 
new MergedSortedCachedWindowStoreIterator<>(cacheIterator, storeIterator, new 
StateSerdes<>("name", Serdes.ByteArray(), Serdes.ByteArray()));
+        int index = 0;
+        while (iterator.hasNext()) {
+            final KeyValue<Long, byte[]> next = iterator.next();
+            final KeyValue<Long, byte[]> expected = 
expectedKvPairs.get(index++);
+            assertArrayEquals(expected.value, next.value);
+            assertEquals(expected.key, next.key);
+        }
+    }
+
+    private static class WindowStoreIteratorStub implements 
WindowStoreIterator<byte[]> {
+
+        private final Iterator<KeyValue<Long, byte[]>> iterator;
+
+        public WindowStoreIteratorStub(final Iterator<KeyValue<Long, byte[]>> 
iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public void close() {
+            //no-op
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Long, byte[]> next() {
+            return iterator.next();
+        }
+
+        @Override
+        public void remove() {
+
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
new file mode 100644
index 0000000..fe11c05
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+public class NamedCacheTest {
+
+    private NamedCache cache;
+
+    @Before
+    public void setUp() throws Exception {
+        cache = new NamedCache("name");
+    }
+
+    @Test
+    public void shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() throws 
IOException {
+        List<KeyValue<String, String>> toInsert = Arrays.asList(
+                new KeyValue<>("K1", "V1"),
+                new KeyValue<>("K2", "V2"),
+                new KeyValue<>("K3", "V3"),
+                new KeyValue<>("K4", "V4"),
+                new KeyValue<>("K5", "V5"));
+        for (int i = 0; i < toInsert.size(); i++) {
+            byte[] key = toInsert.get(i).key.getBytes();
+            byte[] value = toInsert.get(i).value.getBytes();
+            cache.put(Bytes.wrap(key), new LRUCacheEntry(value, true, 1, 1, 1, 
""));
+            LRUCacheEntry head = cache.first();
+            LRUCacheEntry tail = cache.last();
+            assertEquals(new String(head.value), toInsert.get(i).value);
+            assertEquals(new String(tail.value), toInsert.get(0).value);
+            assertEquals(cache.flushes(), 0);
+            assertEquals(cache.hits(), 0);
+            assertEquals(cache.misses(), 0);
+            assertEquals(cache.overwrites(), 0);
+        }
+    }
+
+    @Test
+    public void shouldKeepTrackOfSize() throws Exception {
+        final LRUCacheEntry value = new LRUCacheEntry(new byte[]{0});
+        cache.put(Bytes.wrap(new byte[]{0}), value);
+        cache.put(Bytes.wrap(new byte[]{1}), value);
+        cache.put(Bytes.wrap(new byte[]{2}), value);
+        final long size = cache.sizeInBytes();
+        // 1 byte key + 24 bytes overhead
+        assertEquals((value.size() + 25) * 3, size);
+    }
+
+    @Test
+    public void shouldPutGet() throws Exception {
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new 
byte[]{10}));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{11}));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new 
byte[]{12}));
+
+        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] 
{0})).value);
+        assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] 
{1})).value);
+        assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] 
{2})).value);
+        assertEquals(cache.hits(), 3);
+    }
+
+    @Test
+    public void shouldPutIfAbsent() throws Exception {
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new 
byte[]{10}));
+        cache.putIfAbsent(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new 
byte[]{20}));
+        cache.putIfAbsent(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{30}));
+
+        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] 
{0})).value);
+        assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] 
{1})).value);
+    }
+
+    @Test
+    public void shouldDeleteAndUpdateSize() throws Exception {
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new 
byte[]{10}));
+        final LRUCacheEntry deleted = cache.delete(Bytes.wrap(new byte[]{0}));
+        assertArrayEquals(new byte[] {10}, deleted.value);
+        assertEquals(0, cache.sizeInBytes());
+    }
+
+    @Test
+    public void shouldPutAll() throws Exception {
+        cache.putAll(Arrays.asList(KeyValue.pair(new byte[] {0}, new 
LRUCacheEntry(new byte[]{0})),
+                                   KeyValue.pair(new byte[] {1}, new 
LRUCacheEntry(new byte[]{1})),
+                                   KeyValue.pair(new byte[] {2}, new 
LRUCacheEntry(new byte[]{2}))));
+
+        assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new 
byte[]{0})).value);
+        assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new 
byte[]{1})).value);
+        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new 
byte[]{2})).value);
+    }
+
+    @Test
+    public void shouldOverwriteAll() throws Exception {
+        cache.putAll(Arrays.asList(KeyValue.pair(new byte[] {0}, new 
LRUCacheEntry(new byte[]{0})),
+            KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{1})),
+            KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{2}))));
+
+        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new 
byte[]{0})).value);
+        assertEquals(cache.overwrites(), 2);
+    }
+
+    @Test
+    public void shouldEvictEldestEntry() throws Exception {
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new 
byte[]{10}));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new 
byte[]{30}));
+
+        cache.evict();
+        assertNull(cache.get(Bytes.wrap(new byte[]{0})));
+        assertEquals(2, cache.size());
+    }
+
+    @Test
+    public void shouldFlushDirtEntriesOnEviction() throws Exception {
+        final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>();
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
true, 0, 0, 0, ""));
+
+        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                flushed.addAll(dirty);
+            }
+        });
+
+        cache.evict();
+
+        assertEquals(2, flushed.size());
+        assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key());
+        assertArrayEquals(new byte[] {10}, flushed.get(0).newValue());
+        assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key());
+        assertArrayEquals(new byte[] {30}, flushed.get(1).newValue());
+        assertEquals(cache.flushes(), 1);
+    }
+
+    @Test
+    public void shouldGetRangeIteratorOverKeys() throws Exception {
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
true, 0, 0, 0, ""));
+
+        final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new 
byte[]{1}), Bytes.wrap(new byte[]{2}));
+        assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
+        assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldGetIteratorOverAllKeys() throws Exception {
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
true, 0, 0, 0, ""));
+
+        final Iterator<Bytes> iterator = cache.allKeys();
+        assertEquals(Bytes.wrap(new byte[]{0}), iterator.next());
+        assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
+        assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 6b8f3f3..5e41143 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -28,6 +28,8 @@ import org.rocksdb.Options;
 
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@@ -40,17 +42,35 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
             Class<V> valueClass,
             boolean useContextSerdes) {
 
-        StateStoreSupplier supplier;
+        return createStore(context, keyClass, valueClass, useContextSerdes, 
false);
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K, V> KeyValueStore<K, V> createStore(final ProcessorContext 
context, final Class<K> keyClass, final Class<V> valueClass, final boolean 
useContextSerdes, final boolean enableCaching) {
+
+        Stores.PersistentKeyValueFactory<?, ?> factory = null;
         if (useContextSerdes) {
-            supplier = 
Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).persistent().build();
+            factory = Stores
+                    .create("my-store")
+                    .withKeys(context.keySerde())
+                    .withValues(context.valueSerde())
+                    .persistent();
+
         } else {
-            supplier = 
Stores.create("my-store").withKeys(keyClass).withValues(valueClass).persistent().build();
+            factory = Stores
+                    .create("my-store")
+                    .withKeys(keyClass)
+                    .withValues(valueClass)
+                    .persistent();
         }
 
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        if (enableCaching) {
+            factory.enableCaching();
+        }
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) 
factory.build().get();
         store.init(context, store);
         return store;
-
     }
 
     public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
@@ -71,4 +91,28 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         assertTrue(TheRocksDbConfigSetter.called);
     }
 
+    @Test
+    public void shouldPerformRangeQueriesWithCachingDisabled() throws 
Exception {
+        final KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
+        final KeyValueStore<Integer, String> store = 
createStore(driver.context(), Integer.class, String.class, false, false);
+        store.put(1, "hi");
+        store.put(2, "goodbye");
+        final KeyValueIterator<Integer, String> range = store.range(1, 2);
+        assertEquals("hi", range.next().value);
+        assertEquals("goodbye", range.next().value);
+        assertFalse(range.hasNext());
+    }
+
+    @Test
+    public void shouldPerformAllQueriesWithCachingDisabled() throws Exception {
+        final KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
+        final KeyValueStore<Integer, String> store = 
createStore(driver.context(), Integer.class, String.class, false, false);
+        store.put(1, "hi");
+        store.put(2, "goodbye");
+        final KeyValueIterator<Integer, String> range = store.all();
+        assertEquals("hi", range.next().value);
+        assertEquals("goodbye", range.next().value);
+        assertFalse(range.hasNext());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 521fa32..8389dd6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -50,6 +50,7 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class RocksDBWindowStoreTest {
 
@@ -62,19 +63,12 @@ public class RocksDBWindowStoreTest {
     private final Serde<Integer> intSerde = Serdes.Integer();
     private final Serde<String> stringSerde = Serdes.String();
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", 
intSerde, stringSerde);
+    private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
 
     @SuppressWarnings("unchecked")
-    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext 
context) {
-        StateStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>(windowName,
-                                                                       
retentionPeriod,
-                                                                       
numSegments,
-                                                                       true,
-                                                                       
intSerde,
-                                                                       
stringSerde,
-                                                                       true,
-                                                                       
Collections.<String, String>emptyMap());
-
-        WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext 
context, final boolean enableCaching, final boolean retainDuplicates) {
+        final RocksDBWindowStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, 
retainDuplicates, intSerde, stringSerde, windowSize, true, Collections.<String, 
String>emptyMap(), enableCaching);
+        final WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context, store);
         return store;
     }
@@ -92,24 +86,26 @@ public class RocksDBWindowStoreTest {
         MockProcessorContext context = new MockProcessorContext(
                 null, baseDir,
                 byteArraySerde, byteArraySerde,
-                recordCollector);
+                recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-        final WindowStore<Integer, String> windowStore = 
createWindowStore(context);
+        final WindowStore<Integer, String> windowStore = 
createWindowStore(context, false, true);
         long currentTime = 0;
-        context.setTime(currentTime);
+        context.setRecordContext(createRecordContext(currentTime));
         windowStore.put(1, "one");
+
         currentTime = currentTime + segmentSize;
-        context.setTime(currentTime);
+        context.setRecordContext(createRecordContext(currentTime));
         windowStore.put(1, "two");
         currentTime = currentTime + segmentSize;
-        context.setTime(currentTime);
+
+        context.setRecordContext(createRecordContext(currentTime));
         windowStore.put(1, "three");
 
         final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, 
currentTime);
 
         // roll to the next segment that will close the first
         currentTime = currentTime + segmentSize;
-        context.setTime(currentTime);
+        context.setRecordContext(createRecordContext(currentTime));
         windowStore.put(1, "four");
 
         // should only have 2 values as the first segment is no longer open
@@ -118,6 +114,10 @@ public class RocksDBWindowStoreTest {
         assertFalse(iterator.hasNext());
     }
 
+    private ProcessorRecordContext createRecordContext(final long time) {
+        return new ProcessorRecordContext(time, 0, 0, "topic");
+    }
+
     @Test
     public void testPutAndFetch() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
@@ -137,24 +137,13 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
                 long startTime = segmentSize - 4L;
 
-                context.setTime(startTime + 0L);
-                store.put(0, "zero");
-                context.setTime(startTime + 1L);
-                store.put(1, "one");
-                context.setTime(startTime + 2L);
-                store.put(2, "two");
-                context.setTime(startTime + 3L);
-                // (3, "three") is not put
-                context.setTime(startTime + 4L);
-                store.put(4, "four");
-                context.setTime(startTime + 5L);
-                store.put(5, "five");
+                putFirstBatch(store, startTime, context);
 
                 assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime + 0L - windowSize, startTime + 0L + windowSize)));
                 assertEquals(Utils.mkList("one"), toList(store.fetch(1, 
startTime + 1L - windowSize, startTime + 1L + windowSize)));
@@ -163,18 +152,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + 4L - windowSize, startTime + 4L + windowSize)));
                 assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + 5L - windowSize, startTime + 5L + windowSize)));
 
-                context.setTime(startTime + 3L);
-                store.put(2, "two+1");
-                context.setTime(startTime + 4L);
-                store.put(2, "two+2");
-                context.setTime(startTime + 5L);
-                store.put(2, "two+3");
-                context.setTime(startTime + 6L);
-                store.put(2, "two+4");
-                context.setTime(startTime + 7L);
-                store.put(2, "two+5");
-                context.setTime(startTime + 8L);
-                store.put(2, "two+6");
+                putSecondBatch(store, startTime, context);
 
                 assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 
2L - windowSize, startTime - 2L + windowSize)));
                 assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime - 1L - windowSize, startTime - 1L + windowSize)));
@@ -233,24 +211,13 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
                 long startTime = segmentSize - 4L;
 
-                context.setTime(startTime + 0L);
-                store.put(0, "zero");
-                context.setTime(startTime + 1L);
-                store.put(1, "one");
-                context.setTime(startTime + 2L);
-                store.put(2, "two");
-                context.setTime(startTime + 3L);
-                // (3, "three") is not put
-                context.setTime(startTime + 4L);
-                store.put(4, "four");
-                context.setTime(startTime + 5L);
-                store.put(5, "five");
+                putFirstBatch(store, startTime, context);
 
                 assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime + 0L - windowSize, startTime + 0L)));
                 assertEquals(Utils.mkList("one"), toList(store.fetch(1, 
startTime + 1L - windowSize, startTime + 1L)));
@@ -259,18 +226,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + 4L - windowSize, startTime + 4L)));
                 assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + 5L - windowSize, startTime + 5L)));
 
-                context.setTime(startTime + 3L);
-                store.put(2, "two+1");
-                context.setTime(startTime + 4L);
-                store.put(2, "two+2");
-                context.setTime(startTime + 5L);
-                store.put(2, "two+3");
-                context.setTime(startTime + 6L);
-                store.put(2, "two+4");
-                context.setTime(startTime + 7L);
-                store.put(2, "two+5");
-                context.setTime(startTime + 8L);
-                store.put(2, "two+6");
+                putSecondBatch(store, startTime, context);
 
                 assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 
1L - windowSize, startTime - 1L)));
                 assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
0L - windowSize, startTime + 0L)));
@@ -310,6 +266,21 @@ public class RocksDBWindowStoreTest {
         }
     }
 
+    private void putSecondBatch(final WindowStore<Integer, String> store, 
final long startTime, MockProcessorContext context) {
+        context.setRecordContext(createRecordContext(startTime + 3L));
+        store.put(2, "two+1");
+        context.setRecordContext(createRecordContext(startTime + 4L));
+        store.put(2, "two+2");
+        context.setRecordContext(createRecordContext(startTime + 5L));
+        store.put(2, "two+3");
+        context.setRecordContext(createRecordContext(startTime + 6L));
+        store.put(2, "two+4");
+        context.setRecordContext(createRecordContext(startTime + 7L));
+        store.put(2, "two+5");
+        context.setRecordContext(createRecordContext(startTime + 8L));
+        store.put(2, "two+6");
+    }
+
     @Test
     public void testPutAndFetchAfter() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
@@ -329,24 +300,13 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
                 long startTime = segmentSize - 4L;
 
-                context.setTime(startTime + 0L);
-                store.put(0, "zero");
-                context.setTime(startTime + 1L);
-                store.put(1, "one");
-                context.setTime(startTime + 2L);
-                store.put(2, "two");
-                context.setTime(startTime + 3L);
-                // (3, "three") is not put
-                context.setTime(startTime + 4L);
-                store.put(4, "four");
-                context.setTime(startTime + 5L);
-                store.put(5, "five");
+                putFirstBatch(store, startTime, context);
 
                 assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime + 0L, startTime + 0L + windowSize)));
                 assertEquals(Utils.mkList("one"), toList(store.fetch(1, 
startTime + 1L, startTime + 1L + windowSize)));
@@ -355,18 +315,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + 4L, startTime + 4L + windowSize)));
                 assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + 5L, startTime + 5L + windowSize)));
 
-                context.setTime(startTime + 3L);
-                store.put(2, "two+1");
-                context.setTime(startTime + 4L);
-                store.put(2, "two+2");
-                context.setTime(startTime + 5L);
-                store.put(2, "two+3");
-                context.setTime(startTime + 6L);
-                store.put(2, "two+4");
-                context.setTime(startTime + 7L);
-                store.put(2, "two+5");
-                context.setTime(startTime + 8L);
-                store.put(2, "two+6");
+                putSecondBatch(store, startTime, context);
 
                 assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 
2L, startTime - 2L + windowSize)));
                 assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime - 1L, startTime - 1L + windowSize)));
@@ -406,6 +355,19 @@ public class RocksDBWindowStoreTest {
         }
     }
 
+    private void putFirstBatch(final WindowStore<Integer, String> store, final 
long startTime, final MockProcessorContext context) {
+        context.setRecordContext(createRecordContext(startTime));
+        store.put(0, "zero");
+        context.setRecordContext(createRecordContext(startTime + 1L));
+        store.put(1, "one");
+        context.setRecordContext(createRecordContext(startTime + 2L));
+        store.put(2, "two");
+        context.setRecordContext(createRecordContext(startTime + 4L));
+        store.put(4, "four");
+        context.setRecordContext(createRecordContext(startTime + 5L));
+        store.put(5, "five");
+    }
+
     @Test
     public void testPutSameKeyTimestamp() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
@@ -425,22 +387,19 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
                 long startTime = segmentSize - 4L;
 
-                context.setTime(startTime);
+                context.setRecordContext(createRecordContext(startTime));
                 store.put(0, "zero");
 
                 assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime - windowSize, startTime + windowSize)));
 
-                context.setTime(startTime);
                 store.put(0, "zero");
-                context.setTime(startTime);
                 store.put(0, "zero+");
-                context.setTime(startTime);
                 store.put(0, "zero++");
 
                 assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), 
toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
@@ -466,6 +425,34 @@ public class RocksDBWindowStoreTest {
     }
 
     @Test
+    public void testCachingEnabled() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerde.serializer(), byteArraySerde.serializer());
+            RecordCollector recordCollector = new RecordCollector(producer, 
"anyTaskID") {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new KeyValue<>(
+                        keySerializer.serialize(record.topic(), record.key()),
+                        valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                null, baseDir,
+                byteArraySerde, byteArraySerde,
+                recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
true, false);
+            assertTrue(store instanceof CachingWindowStore);
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
     public void testRolling() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
@@ -484,36 +471,34 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
             try {
                 long startTime = segmentSize * 2;
                 long incr = segmentSize / 2;
-
-                context.setTime(startTime);
+                context.setRecordContext(createRecordContext(startTime));
                 store.put(0, "zero");
                 assertEquals(Utils.mkSet(2L), inner.segmentIds());
 
-                context.setTime(startTime + incr);
+                context.setRecordContext(createRecordContext(startTime + 
incr));
                 store.put(1, "one");
                 assertEquals(Utils.mkSet(2L), inner.segmentIds());
 
-                context.setTime(startTime + incr * 2);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 2));
                 store.put(2, "two");
                 assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
 
-                context.setTime(startTime + incr * 3);
                 // (3, "three") is not put
                 assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
 
-                context.setTime(startTime + incr * 4);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 4));
                 store.put(4, "four");
                 assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
 
-                context.setTime(startTime + incr * 5);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 5));
                 store.put(5, "five");
                 assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
 
@@ -524,7 +509,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
                 assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
 
-                context.setTime(startTime + incr * 6);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 6));
                 store.put(6, "six");
                 assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
 
@@ -537,7 +522,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(Utils.mkList("six"), toList(store.fetch(6, 
startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
 
 
-                context.setTime(startTime + incr * 7);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 7));
                 store.put(7, "seven");
                 assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
 
@@ -550,7 +535,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(Utils.mkList("six"), toList(store.fetch(6, 
startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
                 assertEquals(Utils.mkList("seven"), toList(store.fetch(7, 
startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
 
-                context.setTime(startTime + incr * 8);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 8));
                 store.put(8, "eight");
                 assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
 
@@ -601,27 +586,27 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
-                context.setTime(startTime);
+                context.setRecordContext(createRecordContext(startTime));
                 store.put(0, "zero");
-                context.setTime(startTime + incr);
+                context.setRecordContext(createRecordContext(startTime + 
incr));
                 store.put(1, "one");
-                context.setTime(startTime + incr * 2);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 2));
                 store.put(2, "two");
-                context.setTime(startTime + incr * 3);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 3));
                 store.put(3, "three");
-                context.setTime(startTime + incr * 4);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 4));
                 store.put(4, "four");
-                context.setTime(startTime + incr * 5);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 5));
                 store.put(5, "five");
-                context.setTime(startTime + incr * 6);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 6));
                 store.put(6, "six");
-                context.setTime(startTime + incr * 7);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 7));
                 store.put(7, "seven");
-                context.setTime(startTime + incr * 8);
+                context.setRecordContext(createRecordContext(startTime + incr 
* 8));
                 store.put(8, "eight");
                 store.flush();
 
@@ -650,9 +635,9 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
 
@@ -702,31 +687,31 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
 
             try {
 
                 context.setTime(0L);
+                context.setRecordContext(createRecordContext(0));
                 store.put(0, "v");
                 assertEquals(
                         Utils.mkSet(inner.segmentName(0L)),
                         segmentDirs(baseDir)
                 );
 
-                context.setTime(59999L);
+                context.setRecordContext(createRecordContext(59999));
                 store.put(0, "v");
-                context.setTime(59999L);
                 store.put(0, "v");
                 assertEquals(
                         Utils.mkSet(inner.segmentName(0L)),
                         segmentDirs(baseDir)
                 );
 
-                context.setTime(60000L);
+                context.setRecordContext(createRecordContext(60000));
                 store.put(0, "v");
                 assertEquals(
                         Utils.mkSet(inner.segmentName(0L), 
inner.segmentName(1L)),
@@ -749,7 +734,7 @@ public class RocksDBWindowStoreTest {
                         segmentDirs(baseDir)
                 );
 
-                context.setTime(180000L);
+                context.setRecordContext(createRecordContext(180000));
                 store.put(0, "v");
 
                 iter = store.fetch(0, 0L, 240000L);
@@ -765,7 +750,7 @@ public class RocksDBWindowStoreTest {
                         segmentDirs(baseDir)
                 );
 
-                context.setTime(300000L);
+                context.setRecordContext(createRecordContext(300000));
                 store.put(0, "v");
 
                 iter = store.fetch(0, 240000L, 1000000L);
@@ -805,11 +790,11 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector);
+                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
             File storeDir = new File(baseDir, windowName);
 
-            WindowStore<Integer, String> store = createWindowStore(context);
+            WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
 
@@ -825,7 +810,7 @@ public class RocksDBWindowStoreTest {
                 store.close();
             }
 
-            store = createWindowStore(context);
+            store = createWindowStore(context, false, true);
             inner = (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
 
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index 39b127f..1788159 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -127,5 +127,6 @@ public class StateStoreTestUtils {
         public boolean isOpen() {
             return false;
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 64ce39e..a618c1a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -78,7 +78,7 @@ public class StreamThreadStateStoreProviderTest {
                                   .withStringKeys()
                                   .withStringValues()
                                   .persistent()
-                                  .windowed(10, 2, false).build(), 
"the-processor");
+                                  .windowed(10, 10, 2, false).build(), 
"the-processor");
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";
@@ -187,7 +187,7 @@ public class StreamThreadStateStoreProviderTest {
                               clientSupplier.consumer,
                               clientSupplier.producer,
                               clientSupplier.restoreConsumer,
-                              streamsConfig, new TheStreamMetrics(), 
stateDirectory) {
+                              streamsConfig, new TheStreamMetrics(), 
stateDirectory, null) {
             @Override
             protected void initializeOffsetLimits() {
 

Reply via email to