http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f753b0a..cc424cc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
@@ -43,6 +44,8 @@ import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCacheMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,6 +110,9 @@ public class StreamThread extends Thread {
     private boolean processStandbyRecords = false;
     private AtomicBoolean initialized = new AtomicBoolean(false);
 
+    private final long cacheSizeBytes;
+    private ThreadCache cache;
+
     final ConsumerRebalanceListener rebalanceListener = new 
ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
assignment) {
@@ -157,6 +163,7 @@ public class StreamThread extends Thread {
         super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
         this.applicationId = applicationId;
+        String threadName = getName();
         this.config = config;
         this.builder = builder;
         this.sourceTopics = builder.sourceTopics();
@@ -165,10 +172,17 @@ public class StreamThread extends Thread {
         this.processId = processId;
         this.partitionGrouper = 
config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, 
PartitionGrouper.class);
         this.streamsMetadataState = streamsMetadataState;
-
-        // set the producer and consumer clients
-        String threadName = getName();
         threadClientId = clientId + "-" + threadName;
+        this.sensors = new StreamsMetricsImpl(metrics);
+        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 
0) {
+            log.warn("Negative cache size passed in thread [{}]. Reverting to 
cache size of 0 bytes.", threadName);
+        }
+        this.cacheSizeBytes = Math.max(0, 
config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
+            config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
+        this.cache = new ThreadCache(threadClientId, cacheSizeBytes, 
this.sensors);
+        // set the producer and consumer clients
+
+
         log.info("stream-thread [{}] Creating producer client", threadName);
         this.producer = 
clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
         log.info("stream-thread [{}] Creating consumer client", threadName);
@@ -200,8 +214,6 @@ public class StreamThread extends Thread {
         this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start 
until partition assignment
         this.lastCommitMs = timerStartedMs;
 
-        this.sensors = new StreamsMetricsImpl(metrics);
-
 
         this.running = new AtomicBoolean(true);
     }
@@ -371,11 +383,11 @@ public class StreamThread extends Thread {
                     // even when no task is assigned, we must poll to get a 
task.
                     requiresPoll = true;
                 }
-                maybeCommit();
+
             } else {
                 requiresPoll = true;
             }
-
+            maybeCommit();
             maybeUpdateStandbyTasks();
 
             maybeClean();
@@ -551,7 +563,7 @@ public class StreamThread extends Thread {
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
-        return new StreamTask(id, applicationId, partitions, topology, 
consumer, producer, restoreConsumer, config, sensors, stateDirectory);
+        return new StreamTask(id, applicationId, partitions, topology, 
consumer, producer, restoreConsumer, config, sensors, stateDirectory, cache);
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -621,7 +633,7 @@ public class StreamThread extends Thread {
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
-        if (!topology.stateStoreSuppliers().isEmpty()) {
+        if (!topology.stateStores().isEmpty()) {
             return new StandbyTask(id, applicationId, partitions, topology, 
consumer, restoreConsumer, config, sensors, stateDirectory);
         } else {
             return null;
@@ -718,7 +730,7 @@ public class StreamThread extends Thread {
         }
     }
 
-    private class StreamsMetricsImpl implements StreamsMetrics {
+    private class StreamsMetricsImpl implements StreamsMetrics, 
ThreadCacheMetrics {
         final Metrics metrics;
         final String metricGrpName;
         final String sensorNamePrefix;
@@ -769,6 +781,11 @@ public class StreamThread extends Thread {
             sensor.record(endNs - startNs, timerStartedMs);
         }
 
+        @Override
+        public void recordCacheSensor(Sensor sensor, double count) {
+            sensor.record(count);
+        }
+
         /**
          * @throws IllegalArgumentException if tags is not constructed in 
key-value pairs
          */
@@ -795,6 +812,33 @@ public class StreamThread extends Thread {
             return sensor;
         }
 
+        @Override
+        public Sensor addCacheSensor(String entityName, String operationName, 
String... tags) {
+            // extract the additional tags if there are any
+            Map<String, String> tagMap = new HashMap<>(this.metricTags);
+            if ((tags.length % 2) != 0)
+                throw new IllegalArgumentException("Tags needs to be specified 
in key-value pairs");
+
+            for (int i = 0; i < tags.length; i += 2)
+                tagMap.put(tags[i], tags[i + 1]);
+
+            String metricGroupName = "stream-thread-cache-metrics";
+
+            Sensor sensor = metrics.sensor(sensorNamePrefix + "-" + entityName 
+ "-" + operationName);
+            addCacheMetrics(metricGroupName, sensor, entityName, 
operationName, tagMap);
+            return sensor;
+
+        }
+
+        private void addCacheMetrics(String metricGrpName, Sensor sensor, 
String entityName, String opName, Map<String, String> tags) {
+            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + 
opName + "-avg", metricGrpName,
+                "The current count of " + entityName + " " + opName + " 
operation.", tags), new Avg());
+            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + 
opName + "-min", metricGrpName,
+                "The current count of " + entityName + " " + opName + " 
operation.", tags), new Min());
+            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + 
opName + "-max", metricGrpName,
+                "The current count of " + entityName + " " + opName + " 
operation.", tags), new Max());
+        }
+
         private void addLatencyMetrics(String metricGrpName, Sensor sensor, 
String entityName, String opName, Map<String, String> tags) {
             maybeAddMetric(sensor, metrics.metricName(entityName + "-" + 
opName + "-avg-latency-ms", metricGrpName,
                 "The average latency in milliseconds of " + entityName + " " + 
opName + " operation.", tags), new Avg());

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 03c0d02..c4e0b38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -94,6 +94,8 @@ public class Stores {
                             @Override
                             public PersistentKeyValueFactory<K, V> 
persistent() {
                                 return new PersistentKeyValueFactory<K, V>() {
+                                    public boolean cachingEnabled;
+                                    private long windowSize;
                                     private final Map<String, String> 
logConfig = new HashMap<>();
                                     private int numSegments = 0;
                                     private long retentionPeriod = 0L;
@@ -101,7 +103,8 @@ public class Stores {
                                     private boolean logged = true;
 
                                     @Override
-                                    public PersistentKeyValueFactory<K, V> 
windowed(long retentionPeriod, int numSegments, boolean retainDuplicates) {
+                                    public PersistentKeyValueFactory<K, V> 
windowed(final long windowSize, long retentionPeriod, int numSegments, boolean 
retainDuplicates) {
+                                        this.windowSize = windowSize;
                                         this.numSegments = numSegments;
                                         this.retentionPeriod = retentionPeriod;
                                         this.retainDuplicates = 
retainDuplicates;
@@ -124,13 +127,19 @@ public class Stores {
                                     }
 
                                     @Override
+                                    public PersistentKeyValueFactory<K, V> 
enableCaching() {
+                                        cachingEnabled = true;
+                                        return this;
+                                    }
+
+                                    @Override
                                     public StateStoreSupplier build() {
                                         if (numSegments > 0) {
-                                            return new 
RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, 
retainDuplicates, keySerde, valueSerde, logged, logConfig);
+                                            return new 
RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, 
retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, 
cachingEnabled);
                                         }
-
-                                        return new 
RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig);
+                                        return new 
RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig, 
cachingEnabled);
                                     }
+
                                 };
                             }
 
@@ -367,11 +376,12 @@ public class Stores {
         /**
          * Set the persistent store as a windowed key-value store
          *
+         * @param windowSize size of the windows
          * @param retentionPeriod the maximum period of time in milli-second 
to keep each window in this store
          * @param numSegments the maximum number of segments for rolling the 
windowed store
          * @param retainDuplicates whether or not to retain duplicate data 
within the window
          */
-        PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int 
numSegments, boolean retainDuplicates);
+        PersistentKeyValueFactory<K, V> windowed(final long windowSize, long 
retentionPeriod, int numSegments, boolean retainDuplicates);
 
         /**
          * Indicates that a changelog should be created for the store. The 
changelog will be created
@@ -389,10 +399,12 @@ public class Stores {
          */
         PersistentKeyValueFactory<K, V> disableLogging();
 
+        PersistentKeyValueFactory<K, V> enableCaching();
         /**
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the key-value store; never null
          */
         StateStoreSupplier build();
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
index 64d6e07..39a33a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.Map;
 
+
 public abstract class AbstractStoreSupplier<K, V> implements 
StateStoreSupplier {
     protected final String name;
     protected final Serde<K> keySerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
new file mode 100644
index 0000000..a1dcf34
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+
+public interface CachedStateStore<K, V> {
+    /**
+     * Set the {@link CacheFlushListener} to be notified when entries are 
flushed from the
+     * cache to the underlying {@link 
org.apache.kafka.streams.processor.StateStore}
+     * @param listener
+     */
+    void setFlushListener(final CacheFlushListener<K, V> listener);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
new file mode 100644
index 0000000..81ff5b5
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, 
CachedStateStore<K, V> {
+
+    private final KeyValueStore<Bytes, byte[]> underlying;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private CacheFlushListener<K, V> flushListener;
+    private String name;
+    private ThreadCache cache;
+    private InternalProcessorContext context;
+    private StateSerdes<K, V> serdes;
+    private Thread streamThread;
+
+    CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying,
+                         final Serde<K> keySerde,
+                         final Serde<V> valueSerde) {
+        this.underlying = underlying;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    @Override
+    public String name() {
+        return underlying.name();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        underlying.init(context, root);
+        initInternal(context);
+        // save the stream thread as we only ever want to trigger a flush
+        // when the stream thread is the the current thread.
+        streamThread = Thread.currentThread();
+    }
+
+    @SuppressWarnings("unchecked")
+    void initInternal(final ProcessorContext context) {
+        this.context = (InternalProcessorContext) context;
+        this.serdes = new StateSerdes<>(underlying.name(),
+                                        keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
+                                        valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
+
+        this.name = context.taskId() + "-" + underlying.name();
+        this.cache = this.context.getCache();
+        cache.addDirtyEntryFlushListener(name, new 
ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> entries) {
+                final List<KeyValue<Bytes, byte[]>> keyValues = new 
ArrayList<>();
+                for (ThreadCache.DirtyEntry entry : entries) {
+                    keyValues.add(KeyValue.pair(entry.key(), 
entry.newValue()));
+                    maybeForward(entry, (InternalProcessorContext) context);
+                }
+                underlying.putAll(keyValues);
+            }
+        });
+
+    }
+
+    private void maybeForward(final ThreadCache.DirtyEntry entry, final 
InternalProcessorContext context) {
+        if (flushListener != null) {
+            final RecordContext current = context.recordContext();
+            context.setRecordContext(entry.recordContext());
+            try {
+                flushListener.apply(serdes.keyFrom(entry.key().get()),
+                                    serdes.valueFrom(entry.newValue()), 
serdes.valueFrom(underlying.get(entry.key())));
+            } finally {
+                context.setRecordContext(current);
+            }
+        }
+    }
+
+    public void setFlushListener(final CacheFlushListener<K, V> flushListener) 
{
+        this.flushListener = flushListener;
+    }
+
+    @Override
+    public synchronized void flush() {
+        cache.flush(name);
+        underlying.flush();
+    }
+
+    @Override
+    public void close() {
+        flush();
+        underlying.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return underlying.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return underlying.isOpen();
+    }
+
+    @Override
+    public synchronized V get(final K key) {
+        final byte[] rawKey = serdes.rawKey(key);
+        return get(rawKey);
+    }
+
+    private V get(final byte[] rawKey) {
+        final LRUCacheEntry entry = cache.get(name, rawKey);
+        if (entry == null) {
+            final byte[] rawValue = underlying.get(Bytes.wrap(rawKey));
+            if (rawValue == null) {
+                return null;
+            }
+            // only update the cache if this call is on the streamThread
+            // as we don't want other threads to trigger an eviction/flush
+            if (Thread.currentThread().equals(streamThread)) {
+                cache.put(name, rawKey, new LRUCacheEntry(rawValue));
+            }
+            return serdes.valueFrom(rawValue);
+        }
+
+        if (entry.value == null) {
+            return null;
+        }
+
+        return serdes.valueFrom(entry.value);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(final K from, final K to) {
+        final byte[] origFrom = serdes.rawKey(from);
+        final byte[] origTo = serdes.rawKey(to);
+        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>(underlying.range(Bytes.wrap(origFrom), 
Bytes.wrap(origTo)));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, origFrom, origTo);
+        return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, 
storeIterator, serdes);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>(underlying.all());
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.all(name);
+        return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, 
storeIterator, serdes);
+    }
+
+    @Override
+    public synchronized long approximateNumEntries() {
+        return underlying.approximateNumEntries();
+    }
+
+    @Override
+    public synchronized void put(final K key, final V value) {
+        put(serdes.rawKey(key), value);
+    }
+
+    private synchronized void put(final byte[] rawKey, final V value) {
+        final byte[] rawValue = serdes.rawValue(value);
+        cache.put(name, rawKey, new LRUCacheEntry(rawValue, true, 
context.offset(),
+                                                  context.timestamp(), 
context.partition(), context.topic()));
+    }
+
+    @Override
+    public synchronized V putIfAbsent(final K key, final V value) {
+        final byte[] rawKey = serdes.rawKey(key);
+        final V v = get(rawKey);
+        if (v == null) {
+            put(rawKey, value);
+        }
+        return v;
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries) {
+            put(entry.key, entry.value);
+        }
+    }
+
+    @Override
+    public synchronized V delete(final K key) {
+        final byte[] rawKey = serdes.rawKey(key);
+        final V v = get(rawKey);
+        put(rawKey, null);
+        return v;
+    }
+
+    KeyValueStore<Bytes, byte[]> underlying() {
+        return underlying;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
new file mode 100644
index 0000000..7ff4044
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.List;
+
+class CachingWindowStore<K, V> implements WindowStore<K, V>, 
CachedStateStore<Windowed<K>, V> {
+
+    private final WindowStore<Bytes, byte[]> underlying;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private CacheFlushListener<Windowed<K>, V> flushListener;
+    private final long windowSize;
+    private String name;
+    private ThreadCache cache;
+    private InternalProcessorContext context;
+    private StateSerdes<K, V> serdes;
+
+    CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
+                       final Serde<K> keySerde,
+                       final Serde<V> valueSerde,
+                       final long windowSize) {
+        this.underlying = underlying;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.windowSize = windowSize;
+    }
+
+    @Override
+    public String name() {
+        return underlying.name();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        underlying.init(context, root);
+        initInternal(context);
+    }
+
+    @SuppressWarnings("unchecked")
+    void initInternal(final ProcessorContext context) {
+        this.context = (InternalProcessorContext) context;
+        this.serdes = new StateSerdes<>(underlying.name(),
+                                        keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
+                                        valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
+
+        this.name = context.taskId() + "-" + underlying.name();
+        this.cache = this.context.getCache();
+        cache.addDirtyEntryFlushListener(name, new 
ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> entries) {
+                for (ThreadCache.DirtyEntry entry : entries) {
+                    final byte[] binaryKey = entry.key().get();
+                    final Bytes key = 
WindowStoreUtils.bytesKeyFromBinaryKey(binaryKey);
+                    final long timestamp = 
WindowStoreUtils.timestampFromBinaryKey(binaryKey);
+                    final Windowed<K> windowedKey = new 
Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes),
+                                                                   new 
TimeWindow(timestamp, timestamp + windowSize));
+                    maybeForward(entry, key, timestamp, windowedKey, 
(InternalProcessorContext) context);
+                    underlying.put(key, entry.newValue(), timestamp);
+                }
+            }
+        });
+
+    }
+
+    private void maybeForward(final ThreadCache.DirtyEntry entry,
+                              final Bytes key,
+                              final long timestamp,
+                              final Windowed<K> windowedKey,
+                              final InternalProcessorContext context) {
+        if (flushListener != null) {
+            final RecordContext current = context.recordContext();
+            context.setRecordContext(entry.recordContext());
+            try {
+                flushListener.apply(windowedKey,
+                                    serdes.valueFrom(entry.newValue()), 
fetchPrevious(key, timestamp));
+            } finally {
+                context.setRecordContext(current);
+            }
+
+        }
+    }
+
+    public void setFlushListener(CacheFlushListener<Windowed<K>, V> 
flushListener) {
+        this.flushListener = flushListener;
+    }
+
+    @Override
+    public synchronized void flush() {
+        cache.flush(name);
+        underlying.flush();
+    }
+
+    @Override
+    public void close() {
+        flush();
+        underlying.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return underlying.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return underlying.isOpen();
+    }
+
+    @Override
+    public synchronized void put(final K key, final V value) {
+        put(key, value, context.timestamp());
+    }
+
+
+    @Override
+    public synchronized void put(final K key, final V value, final long 
timestamp) {
+        final byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, 
0, serdes);
+        final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), 
true, context.offset(),
+                                                      timestamp, 
context.partition(), context.topic());
+        cache.put(name, binaryKey, entry);
+    }
+
+    @Override
+    public synchronized WindowStoreIterator<V> fetch(final K key, final long 
timeFrom, final long timeTo) {
+        byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, 
serdes);
+        byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
+
+        final WindowStoreIterator<byte[]> underlyingIterator = 
underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, binaryFrom, binaryTo);
+        return new MergedSortedCachedWindowStoreIterator<>(cacheIterator, new 
DelegatingPeekingWindowIterator<>(underlyingIterator), serdes);
+    }
+
+    private V fetchPrevious(final Bytes key, final long timestamp) {
+        final WindowStoreIterator<byte[]> iterator = underlying.fetch(key, 
timestamp, timestamp);
+        if (!iterator.hasNext()) {
+            return null;
+        }
+        return serdes.valueFrom(iterator.next().value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
new file mode 100644
index 0000000..38d5108
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.NoSuchElementException;
+
+class DelegatingPeekingKeyValueIterator<K, V> implements 
PeekingKeyValueIterator<K, V> {
+    private final KeyValueIterator<K, V> underlying;
+    private KeyValue<K, V> next;
+
+    public DelegatingPeekingKeyValueIterator(final KeyValueIterator<K, V> 
underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public K peekNextKey() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return next.key;
+    }
+
+    @Override
+    public void close() {
+        underlying.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (next != null) {
+            return true;
+        }
+
+        if (!underlying.hasNext()) {
+            return false;
+        }
+
+        next = underlying.next();
+        return true;
+    }
+
+    @Override
+    public KeyValue<K, V> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        final KeyValue<K, V> result = next;
+        next = null;
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove not supported");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java
new file mode 100644
index 0000000..402a363
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.NoSuchElementException;
+
+class DelegatingPeekingWindowIterator<V> implements PeekingWindowIterator<V> {
+    private final WindowStoreIterator<V> underlying;
+    private KeyValue<Long, V> next;
+
+    public DelegatingPeekingWindowIterator(final WindowStoreIterator<V> 
underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public KeyValue<Long, V> peekNext() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return next;
+    }
+
+    @Override
+    public void close() {
+        underlying.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (next != null) {
+            return true;
+        }
+
+        if (!underlying.hasNext()) {
+            return false;
+        }
+
+        next = underlying.next();
+        return true;
+    }
+
+    @Override
+    public KeyValue<Long, V> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        final KeyValue<Long, V> result = next;
+        next = null;
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove not supported");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
new file mode 100644
index 0000000..e77c642
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.internals.RecordContext;
+
+/**
+ * A cache entry
+ */
+class LRUCacheEntry implements RecordContext {
+
+    public final byte[] value;
+    private final long offset;
+    private final long timestamp;
+    private final String topic;
+    boolean isDirty;
+    private final int partition;
+    private long sizeBytes = 0;
+
+
+    LRUCacheEntry(final byte[] value) {
+        this(value, false, -1, -1, -1, "");
+    }
+
+    LRUCacheEntry(final byte[] value, final boolean isDirty,
+                  final long offset, final long timestamp, final int partition,
+                  final String topic) {
+        this.value = value;
+        this.partition = partition;
+        this.topic = topic;
+        this.offset = offset;
+        this.isDirty = isDirty;
+        this.timestamp = timestamp;
+        this.sizeBytes = (value == null ? 0 : value.length) +
+                1 + // isDirty
+                8 + // timestamp
+                8 + // offset
+                4 + // partition
+                topic.length();
+
+    }
+
+
+
+    void markClean() {
+        isDirty = false;
+    }
+
+    @Override
+    public long offset() {
+        return offset;
+    }
+
+    @Override
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String topic() {
+        return topic;
+    }
+
+    @Override
+    public int partition() {
+        return partition;
+    }
+
+    boolean isDirty() {
+        return isDirty;
+    }
+
+    public long size() {
+        return sizeBytes;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
new file mode 100644
index 0000000..23bbe7f
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param <K>
+ * @param <V>
+ */
+class MergedSortedCacheKeyValueStoreIterator<K, V> implements 
KeyValueIterator<K, V> {
+    private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
+    private final PeekingKeyValueIterator<Bytes, byte[]> storeIterator;
+    private final StateSerdes<K, V> serdes;
+    private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
+
+    public MergedSortedCacheKeyValueStoreIterator(final 
ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
+                                                  final 
PeekingKeyValueIterator<Bytes, byte[]> storeIterator,
+                                                  final StateSerdes<K, V> 
serdes) {
+        this.cacheIterator = cacheIterator;
+        this.storeIterator = storeIterator;
+        this.serdes = serdes;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return cacheIterator.hasNext() || storeIterator.hasNext();
+    }
+
+
+    @Override
+    public KeyValue<K, V> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        byte[] nextCacheKey = null;
+        if (cacheIterator.hasNext()) {
+            nextCacheKey = cacheIterator.peekNextKey();
+        }
+
+        byte[] nextStoreKey = null;
+        if (storeIterator.hasNext()) {
+            nextStoreKey = storeIterator.peekNextKey().get();
+        }
+
+        if (nextCacheKey == null) {
+            return nextStoreValue();
+        }
+
+        if (nextStoreKey == null) {
+            return nextCacheValue();
+        }
+
+        final int comparison = comparator.compare(nextCacheKey, nextStoreKey);
+        if (comparison > 0) {
+            return nextStoreValue();
+        } else if (comparison < 0) {
+            return nextCacheValue();
+        } else {
+            storeIterator.next();
+            return nextCacheValue();
+        }
+
+    }
+
+    private KeyValue<K, V> nextCacheValue() {
+        final KeyValue<byte[], LRUCacheEntry> next = cacheIterator.next();
+        return KeyValue.pair(serdes.keyFrom(next.key), 
serdes.valueFrom(next.value.value));
+    }
+
+    private KeyValue<K, V> nextStoreValue() {
+        final KeyValue<Bytes, byte[]> next = storeIterator.next();
+        return KeyValue.pair(serdes.keyFrom(next.key.get()), 
serdes.valueFrom(next.value));
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove not supported");
+    }
+
+    @Override
+    public void close() {
+        cacheIterator.close();
+        storeIterator.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
new file mode 100644
index 0000000..68f147f
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.NoSuchElementException;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param <K>
+ * @param <V>
+ */
+class MergedSortedCachedWindowStoreIterator<K, V> implements 
WindowStoreIterator<V> {
+    private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
+    private final PeekingWindowIterator<byte[]> storeIterator;
+    private final StateSerdes<K, V> serdes;
+
+    public MergedSortedCachedWindowStoreIterator(final 
ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
+                                                 final 
PeekingWindowIterator<byte[]> storeIterator,
+                                                 final StateSerdes<K, V> 
serdes) {
+        this.cacheIterator = cacheIterator;
+        this.storeIterator = storeIterator;
+        this.serdes = serdes;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return cacheIterator.hasNext() || storeIterator.hasNext();
+    }
+
+
+    @Override
+    public KeyValue<Long, V> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        Long nextCacheTimestamp = null;
+        if (cacheIterator.hasNext()) {
+            nextCacheTimestamp = 
WindowStoreUtils.timestampFromBinaryKey(cacheIterator.peekNextKey());
+        }
+
+        Long nextStoreTimestamp = null;
+        if (storeIterator.hasNext()) {
+            nextStoreTimestamp = storeIterator.peekNext().key;
+        }
+
+        if (nextCacheTimestamp == null) {
+            return nextStoreValue();
+        }
+
+        if (nextStoreTimestamp == null) {
+            return nextCacheValue(nextCacheTimestamp);
+        }
+
+        final int comparison = 
nextCacheTimestamp.compareTo(nextStoreTimestamp);
+        if (comparison > 0) {
+            return nextStoreValue();
+        } else if (comparison < 0) {
+            return nextCacheValue(nextCacheTimestamp);
+        } else {
+            storeIterator.next();
+            return nextCacheValue(nextCacheTimestamp);
+        }
+    }
+
+    private KeyValue<Long, V> nextCacheValue(final Long timestamp) {
+        final KeyValue<byte[], LRUCacheEntry> next = cacheIterator.next();
+        return KeyValue.pair(timestamp, serdes.valueFrom(next.value.value));
+    }
+
+    private KeyValue<Long, V> nextStoreValue() {
+        final KeyValue<Long, byte[]> next = storeIterator.next();
+        return KeyValue.pair(next.key, serdes.valueFrom(next.value));
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove not supported");
+    }
+
+    @Override
+    public void close() {
+        cacheIterator.close();
+        storeIterator.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
new file mode 100644
index 0000000..a3a0078
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+class NamedCache {
+    private static final Logger log = 
LoggerFactory.getLogger(NamedCache.class);
+    private final String name;
+    private final TreeMap<Bytes, LRUNode> cache = new TreeMap<>();
+    private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
+    private ThreadCache.DirtyEntryFlushListener listener;
+    private LRUNode tail;
+    private LRUNode head;
+    private long currentSizeBytes;
+    private ThreadCacheMetrics metrics;
+
+    // JMX stats
+    private Sensor hitRatio = null;
+
+
+    // internal stats
+    private long numReadHits = 0;
+    private long numReadMisses = 0;
+    private long numOverwrites = 0;
+    private long numFlushes = 0;
+
+    NamedCache(final String name) {
+        this(name, null);
+    }
+
+    NamedCache(final String name, final ThreadCacheMetrics metrics) {
+        this.name = name;
+        this.metrics = metrics != null ? metrics : new 
ThreadCache.NullThreadCacheMetrics();
+
+        this.hitRatio = this.metrics.addCacheSensor(name, "hitRatio");
+    }
+
+    synchronized long hits() {
+        return numReadHits;
+    }
+
+    synchronized long misses() {
+        return numReadMisses;
+    }
+
+    synchronized long overwrites() {
+        return numOverwrites;
+    }
+
+    synchronized long flushes() {
+        return numFlushes;
+    }
+
+    synchronized LRUCacheEntry get(final Bytes key) {
+        final LRUNode node = getInternal(key);
+        if (node == null) {
+            return null;
+        }
+        updateLRU(node);
+        return node.entry;
+    }
+
+    synchronized void setListener(final ThreadCache.DirtyEntryFlushListener 
listener) {
+        this.listener = listener;
+    }
+
+    synchronized void flush() {
+        numFlushes++;
+
+        log.debug("Named cache {} stats on flush: #hits={}, #misses={}, 
#overwrites={}, #flushes={}",
+            name, hits(), misses(), overwrites(), flushes());
+
+        if (listener == null) {
+            throw new IllegalArgumentException("No listener for namespace " + 
name + " registered with cache");
+        }
+
+        if (dirtyKeys.isEmpty()) {
+            return;
+        }
+
+        final List<ThreadCache.DirtyEntry> entries  = new ArrayList<>();
+        for (Bytes key : dirtyKeys) {
+            final LRUNode node = getInternal(key);
+            if (node == null) {
+                throw new IllegalStateException("Key found in dirty key set, 
but entry is null");
+            }
+            entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, 
node.entry));
+            node.entry.markClean();
+        }
+        listener.apply(entries);
+        dirtyKeys.clear();
+    }
+
+
+    synchronized void put(final Bytes key, final LRUCacheEntry value) {
+        LRUNode node = cache.get(key);
+        if (node != null) {
+            numOverwrites++;
+
+            currentSizeBytes -= node.size();
+            node.update(value);
+            updateLRU(node);
+        } else {
+            node = new LRUNode(key, value);
+            // put element
+            putHead(node);
+            cache.put(key, node);
+        }
+        if (value.isDirty()) {
+            // first remove and then add so we can maintain ordering as the 
arrival order of the records.
+            dirtyKeys.remove(key);
+            dirtyKeys.add(key);
+        }
+        currentSizeBytes += node.size();
+    }
+
+    synchronized long sizeInBytes() {
+        return currentSizeBytes;
+    }
+
+    private LRUNode getInternal(final Bytes key) {
+        final LRUNode node = cache.get(key);
+        if (node == null) {
+            numReadMisses++;
+
+            return null;
+        } else {
+            numReadHits++;
+            metrics.recordCacheSensor(hitRatio, (double) numReadHits / 
(double) (numReadHits + numReadMisses));
+        }
+        return node;
+    }
+
+    private void updateLRU(LRUNode node) {
+        remove(node);
+
+        putHead(node);
+    }
+
+    private void remove(LRUNode node) {
+        if (node.previous != null) {
+            node.previous.next = node.next;
+        } else {
+            head = node.next;
+        }
+        if (node.next != null) {
+            node.next.previous = node.previous;
+        } else {
+            tail = node.previous;
+        }
+    }
+
+    private void putHead(LRUNode node) {
+        node.next = head;
+        node.previous = null;
+        if (head != null) {
+            head.previous = node;
+        }
+        head = node;
+        if (tail == null) {
+            tail = head;
+        }
+    }
+
+    synchronized void evict() {
+        final LRUNode eldest = tail;
+        currentSizeBytes -= eldest.size();
+        if (eldest.entry.isDirty()) {
+            flush();
+        }
+        remove(eldest);
+        cache.remove(eldest.key);
+    }
+
+    synchronized LRUCacheEntry putIfAbsent(final Bytes key, final 
LRUCacheEntry value) {
+        final LRUCacheEntry originalValue = get(key);
+        if (originalValue == null) {
+            put(key, value);
+        }
+        return originalValue;
+    }
+
+    synchronized void putAll(final List<KeyValue<byte[], LRUCacheEntry>> 
entries) {
+        for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
+            put(Bytes.wrap(entry.key), entry.value);
+        }
+    }
+
+    synchronized LRUCacheEntry delete(final Bytes key) {
+        final LRUNode node = cache.remove(key);
+
+        if (node == null) {
+            return null;
+        }
+
+        remove(node);
+        cache.remove(key);
+        dirtyKeys.remove(key);
+        currentSizeBytes -= node.size();
+        return node.entry();
+    }
+
+    public long size() {
+        return cache.size();
+    }
+
+    synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
true));
+    }
+
+    private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
+        final TreeSet<Bytes> copy = new TreeSet<>();
+        copy.addAll(keySet);
+        return copy.iterator();
+    }
+    
+
+    synchronized Iterator<Bytes> allKeys() {
+        return keySetIterator(cache.navigableKeySet());
+    }
+
+    synchronized LRUCacheEntry first() {
+        if (head == null) {
+            return null;
+        }
+        return head.entry;
+    }
+
+    synchronized LRUCacheEntry last() {
+        if (tail == null) {
+            return null;
+        }
+        return tail.entry;
+    }
+
+    synchronized long dirtySize() {
+        return dirtyKeys.size();
+    }
+
+    /**
+     * A simple wrapper class to implement a doubly-linked list around 
MemoryLRUCacheBytesEntry
+     */
+    private class LRUNode {
+        private final Bytes key;
+        private LRUCacheEntry entry;
+        private LRUNode previous;
+        private LRUNode next;
+
+        LRUNode(final Bytes key, final LRUCacheEntry entry) {
+            this.key = key;
+            this.entry = entry;
+        }
+
+        public LRUCacheEntry entry() {
+            return entry;
+        }
+
+        public void update(LRUCacheEntry entry) {
+            this.entry = entry;
+        }
+
+        public long size() {
+            return  key.get().length +
+                    8 + // entry
+                    8 + // previous
+                    8 + // next
+                    entry.size();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
new file mode 100644
index 0000000..9a3a05c
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+public interface PeekingKeyValueIterator<K, V> extends KeyValueIterator<K, V> {
+
+    K peekNextKey();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java
new file mode 100644
index 0000000..c112169
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+public interface PeekingWindowIterator<V> extends WindowStoreIterator<V> {
+
+    KeyValue<Long, V> peekNext();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index c10b7e1..68a4429 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 
@@ -31,19 +33,31 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-public class RocksDBKeyValueStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V> {
 
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
-        this(name, keySerde, valueSerde, null, logged, logConfig);
+public class RocksDBKeyValueStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V>  {
+
+    private final boolean enableCaching;
+
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean 
enableCaching) {
+        this(name, keySerde, valueSerde, null, logged, logConfig, 
enableCaching);
     }
 
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, 
boolean enableCaching) {
         super(name, keySerde, valueSerde, time, logged, logConfig);
+        this.enableCaching = enableCaching;
     }
 
     public StateStore get() {
-        RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, 
valueSerde);
-        return new MeteredKeyValueStore<>(logged ? store.enableLogging() : 
store, "rocksdb-state", time);
+        if (!enableCaching) {
+            RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, 
valueSerde);
+            return new MeteredKeyValueStore<>(logged ? store.enableLogging() : 
store, "rocksdb-state", time);
+        }
+
+        final RocksDBStore<Bytes, byte[]> store = new RocksDBStore<>(name, 
Serdes.Bytes(), Serdes.ByteArray());
+        return new CachingKeyValueStore<>(new MeteredKeyValueStore<>(logged ? 
store.enableLogging() : store,
+                                                                     
"rocksdb-state",
+                                                                     time),
+                                          keySerde,
+                                          valueSerde);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6bd0f92..6a34ef9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.StateSerdes;
-
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
@@ -44,13 +43,10 @@ import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 
 /**
  * A persistent key-value store based on RocksDB.
@@ -69,7 +65,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
     private static final int TTL_NOT_USED = -1;
 
     // TODO: these values should be configurable
-    private static final int DEFAULT_UNENCODED_CACHE_SIZE = 1000;
     private static final CompressionType COMPRESSION_TYPE = 
CompressionType.NO_COMPRESSION;
     private static final CompactionStyle COMPACTION_STYLE = 
CompactionStyle.UNIVERSAL;
     private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
@@ -95,10 +90,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
     private FlushOptions fOptions;
 
     private boolean loggingEnabled = false;
-    private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;
 
-    private Set<K> cacheDirtyKeys;
-    private MemoryLRUCache<K, RocksDBCacheEntry> cache;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
 
@@ -110,16 +102,11 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         return this;
     }
 
-    public RocksDBStore<K, V> withCacheSize(int cacheSize) {
-        this.cacheSize = cacheSize;
-
-        return this;
-    }
-
     public RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
         this(name, DB_FILE_DIR, keySerde, valueSerde);
     }
 
+
     public RocksDBStore(String name, String parentDir, Serde<K> keySerde, 
Serde<V> valueSerde) {
         this.name = name;
         this.parentDir = parentDir;
@@ -147,20 +134,6 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         fOptions.setWaitForFlush(true);
     }
 
-    private class RocksDBCacheEntry {
-        public V value;
-        public boolean isDirty;
-
-        public RocksDBCacheEntry(V value) {
-            this(value, false);
-        }
-
-        public RocksDBCacheEntry(V value, boolean isDirty) {
-            this.value = value;
-            this.isDirty = isDirty;
-        }
-    }
-
     @SuppressWarnings("unchecked")
     public void openDB(ProcessorContext context) {
         final Map<String, Object> configs = context.appConfigs();
@@ -184,25 +157,6 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         openDB(context);
 
         this.changeLogger = this.loggingEnabled ? new 
StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
-
-        if (this.cacheSize > 0) {
-            this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, 
cacheSize, null, null)
-                    .whenEldestRemoved(new 
MemoryLRUCache.EldestEntryRemovalListener<K, RocksDBCacheEntry>() {
-                        @Override
-                        public void apply(K key, RocksDBCacheEntry entry) {
-                            // flush all the dirty entries to RocksDB if this 
evicted entry is dirty
-                            if (entry.isDirty) {
-                                flushCache();
-                            }
-                        }
-                    });
-
-            this.cacheDirtyKeys = new HashSet<>();
-        } else {
-            this.cache = null;
-            this.cacheDirtyKeys = null;
-        }
-
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
         this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
@@ -255,30 +209,12 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
     @Override
     public synchronized V get(K key) {
         validateStoreOpen();
-        if (cache != null) {
-            RocksDBCacheEntry entry = cache.get(key);
-            if (entry == null) {
-                byte[] byteValue = getInternal(serdes.rawKey(key));
-                //Check value for null, to avoid  deserialization error
-                if (byteValue == null) {
-                    return null;
-                } else {
-                    V value = serdes.valueFrom(byteValue);
-                    cache.put(key, new RocksDBCacheEntry(value));
-                    return value;
-                }
-            } else {
-                return entry.value;
-            }
+        byte[] byteValue = getInternal(serdes.rawKey(key));
+        if (byteValue == null) {
+            return null;
         } else {
-            byte[] byteValue = getInternal(serdes.rawKey(key));
-            if (byteValue == null) {
-                return null;
-            } else {
-                return serdes.valueFrom(byteValue);
-            }
+            return serdes.valueFrom(byteValue);
         }
-
     }
 
     private void validateStoreOpen() {
@@ -296,23 +232,23 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public synchronized void put(K key, V value) {
         validateStoreOpen();
-        if (cache != null) {
-            cacheDirtyKeys.add(key);
-            cache.put(key, new RocksDBCacheEntry(value, true));
-        } else {
-            byte[] rawKey = serdes.rawKey(key);
-            byte[] rawValue = serdes.rawValue(value);
-            putInternal(rawKey, rawValue);
+        byte[] rawKey = serdes.rawKey(key);
+        byte[] rawValue = serdes.rawValue(value);
+        putInternal(rawKey, rawValue);
 
-            if (loggingEnabled) {
-                changeLogger.add(Bytes.wrap(rawKey));
-                changeLogger.maybeLogChange(this.getter);
-            }
+        if (loggingEnabled) {
+            changeLogger.add(Bytes.wrap(rawKey));
+            changeLogger.maybeLogChange(this.getter);
         }
+    }
 
+    @SuppressWarnings("unchecked")
+    synchronized void writeToStore(K key, V value) {
+        putInternal(serdes.rawKey(key), serdes.rawValue(value));
     }
 
     @Override
@@ -344,21 +280,26 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
 
     @Override
     public void putAll(List<KeyValue<K, V>> entries) {
-        for (KeyValue<K, V> entry : entries)
-            put(entry.key, entry.value);
-    }
-
-    // this function is only called in flushCache()
-    private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
         try (WriteBatch batch = new WriteBatch()) {
-            for (KeyValue<byte[], byte[]> entry : entries) {
-                batch.put(entry.key, entry.value);
+            for (KeyValue<K, V> entry : entries) {
+                final byte[] rawKey = serdes.rawKey(entry.key);
+                if (entry.value == null) {
+                    db.remove(rawKey);
+                } else {
+                    batch.put(rawKey, serdes.rawValue(entry.value));
+                    if (loggingEnabled) {
+                        changeLogger.add(Bytes.wrap(rawKey));
+                    }
+                }
             }
-
             db.write(wOptions, batch);
+            if (loggingEnabled) {
+                changeLogger.maybeLogChange(getter);
+            }
         } catch (RocksDBException e) {
             throw new ProcessorStateException("Error while batch writing to 
store " + this.name, e);
         }
+
     }
 
     @Override
@@ -371,20 +312,14 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
     @Override
     public synchronized KeyValueIterator<K, V> range(K from, K to) {
         validateStoreOpen();
-        // we need to flush the cache if necessary before returning the 
iterator
-        if (cache != null)
-            flushCache();
-
+        // query rocksdb
         return new RocksDBRangeIterator<>(db.newIterator(), serdes, from, to);
     }
 
     @Override
     public synchronized KeyValueIterator<K, V> all() {
         validateStoreOpen();
-        // we need to flush the cache if necessary before returning the 
iterator
-        if (cache != null)
-            flushCache();
-
+        // query rocksdb
         RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
         return new RocksDbIterator<>(innerIter, serdes);
@@ -412,12 +347,6 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         if (isOverflowing(value)) {
             return Long.MAX_VALUE;
         }
-        if (this.cacheDirtyKeys != null) {
-            value += this.cacheDirtyKeys.size();
-        }
-        if (isOverflowing(value)) {
-            return Long.MAX_VALUE;
-        }
         return value;
     }
 
@@ -427,71 +356,17 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         return value < 0;
     }
 
-    private void flushCache() {
-        // flush of the cache entries if necessary
-        if (cache != null) {
-            List<KeyValue<byte[], byte[]>> putBatch = new 
ArrayList<>(cache.size());
-            List<byte[]> deleteBatch = new ArrayList<>(cache.size());
-
-            for (K key : cacheDirtyKeys) {
-                RocksDBCacheEntry entry = cache.get(key);
-
-                if (entry != null) {
-                    entry.isDirty = false;
-
-                    byte[] rawKey = serdes.rawKey(key);
-
-                    if (entry.value != null) {
-                        putBatch.add(new KeyValue<>(rawKey, 
serdes.rawValue(entry.value)));
-                    } else {
-                        deleteBatch.add(rawKey);
-                    }
-                }
-            }
-
-            putAllInternal(putBatch);
-
-            if (loggingEnabled) {
-                for (KeyValue<byte[], byte[]> kv : putBatch)
-                    changeLogger.add(Bytes.wrap(kv.key));
-            }
-
-            // check all removed entries and remove them in rocksDB
-            // TODO: can this be done in batch as well?
-            for (byte[] removedKey : deleteBatch) {
-                try {
-                    db.remove(wOptions, removedKey);
-                } catch (RocksDBException e) {
-                    throw new ProcessorStateException("Error while deleting 
with key " + serdes.keyFrom(removedKey) + " from store " + this.name, e);
-                }
-
-                if (loggingEnabled) {
-                    changeLogger.delete(Bytes.wrap(removedKey));
-                }
-            }
-
-            // reset dirty set
-            cacheDirtyKeys.clear();
-        }
-
-        if (loggingEnabled)
-            changeLogger.logChange(getter);
-
-    }
-
     @Override
     public synchronized void flush() {
         if (db == null) {
             return;
         }
-
-        // flush of the cache entries if necessary
-        flushCache();
-
+        if (loggingEnabled) {
+            changeLogger.logChange(getter);
+        }
         // flush RocksDB
         flushInternal();
     }
-
     /**
      * @throws ProcessorStateException if flushing failed because of any 
internal store exceptions
      */
@@ -521,7 +396,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
         db = null;
     }
 
-    private static class RocksDbIterator<K, V> implements KeyValueIterator<K, 
V> {
+
+
+    public static class RocksDbIterator<K, V> implements KeyValueIterator<K, 
V> {
         private final RocksIterator iter;
         private final StateSerdes<K, V> serdes;
 
@@ -530,7 +407,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
             this.serdes = serdes;
         }
 
-        protected byte[] peekRawKey() {
+        public byte[] peekRawKey() {
             return iter.key();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index db1d2ae..dd24320 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -51,8 +52,6 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
     public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
 
-    private static final long USE_CURRENT_TIMESTAMP = -1L;
-
     private volatile boolean open = false;
 
     // use the Bytes wrapper for underlying rocksDB keys since they are used 
for hashing data structures
@@ -81,7 +80,7 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
         private final Bytes from;
         private final Bytes to;
         private KeyValueIterator<Bytes, byte[]> currentIterator;
-        private Segment currentSegment;
+        private KeyValueStore<Bytes, byte[]> currentSegment;
 
         RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
             this(serdes, null, null, Collections.<Segment>emptyIterator());
@@ -180,10 +179,10 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
     public RocksDBWindowStore<K, V> enableLogging() {
         loggingEnabled = true;
-
         return this;
     }
 
+
     @Override
     public String name() {
         return name;
@@ -258,9 +257,10 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
     @Override
     public void flush() {
-        for (Segment segment : segments.values()) {
-            if (segment != null)
+        for (KeyValueStore<Bytes, byte[]> segment : segments.values()) {
+            if (segment != null) {
                 segment.flush();
+            }
         }
 
         if (loggingEnabled)
@@ -271,7 +271,7 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
     public void close() {
         open = false;
         flush();
-        for (Segment segment : segments.values()) {
+        for (KeyValueStore segment : segments.values()) {
             if (segment != null)
                 segment.close();
         }
@@ -279,7 +279,7 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
     @Override
     public void put(K key, V value) {
-        byte[] rawKey = putAndReturnInternalKey(key, value, 
USE_CURRENT_TIMESTAMP);
+        byte[] rawKey = putAndReturnInternalKey(key, value, 
context.timestamp());
 
         if (rawKey != null && loggingEnabled) {
             changeLogger.add(Bytes.wrap(rawKey));
@@ -297,9 +297,7 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
         }
     }
 
-    private byte[] putAndReturnInternalKey(K key, V value, long t) {
-        long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
-
+    private byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
         long segmentId = segmentId(timestamp);
 
         if (segmentId > currentSegmentId) {
@@ -309,7 +307,7 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
         }
 
         // If the record is within the retention period, put it in the store.
-        Segment segment = getOrCreateSegment(segmentId);
+        KeyValueStore<Bytes, byte[]> segment = getOrCreateSegment(segmentId);
         if (segment != null) {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
@@ -322,7 +320,8 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
     }
 
     private void putInternal(byte[] binaryKey, byte[] binaryValue) {
-        long segmentId = 
segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
+        final long timestamp = 
WindowStoreUtils.timestampFromBinaryKey(binaryKey);
+        long segmentId = segmentId(timestamp);
 
         if (segmentId > currentSegmentId) {
             // A new segment will be created. Clean up old segments first.
@@ -332,14 +331,15 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
         // If the record is within the retention period, put it in the store.
         Segment segment = getOrCreateSegment(segmentId);
-        if (segment != null)
-            segment.put(Bytes.wrap(binaryKey), binaryValue);
+        if (segment != null) {
+            segment.writeToStore(Bytes.wrap(binaryKey), binaryValue);
+        }
     }
 
     private byte[] getInternal(byte[] binaryKey) {
         long segmentId = 
segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
 
-        Segment segment = getSegment(segmentId);
+        KeyValueStore<Bytes, byte[]> segment = getSegment(segmentId);
         if (segment != null) {
             return segment.get(Bytes.wrap(binaryKey));
         } else {
@@ -380,22 +380,25 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
     private Segment getSegment(long segmentId) {
         final Segment segment = segments.get(segmentId % numSegments);
-        if (segment != null && segment.id != segmentId) {
+        if (!isSegment(segment, segmentId)) {
             return null;
         }
         return segment;
     }
 
+    private boolean isSegment(final Segment store, long segmentId) {
+        return store != null && store.id == segmentId;
+    }
 
     private Segment getOrCreateSegment(long segmentId) {
         if (segmentId <= currentSegmentId && segmentId > currentSegmentId - 
numSegments) {
             final long key = segmentId % numSegments;
             final Segment segment = segments.get(key);
-            if (segment != null && segment.id != segmentId) {
+            if (!isSegment(segment, segmentId)) {
                 cleanup();
             }
             if (!segments.containsKey(key)) {
-                final Segment newSegment = new Segment(segmentName(segmentId), 
name, segmentId);
+                Segment newSegment = new Segment(segmentName(segmentId), name, 
segmentId);
                 newSegment.openDB(context);
                 segments.put(key, newSegment);
             }
@@ -423,12 +426,12 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
     // this method is defined public since it is used for unit tests
     public String segmentName(long segmentId) {
-        return formatter.format(new Date(segmentId * segmentInterval));
+        return name + "-" + formatter.format(new Date(segmentId * 
segmentInterval));
     }
 
     public long segmentIdFromSegmentName(String segmentName) {
         try {
-            Date date = formatter.parse(segmentName);
+            Date date = formatter.parse(segmentName.substring(name.length() + 
1));
             return date.getTime() / segmentInterval;
         } catch (Exception ex) {
             return -1L;
@@ -440,8 +443,9 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
         HashSet<Long> segmentIds = new HashSet<>();
 
         for (Segment segment : segments.values()) {
-            if (segment != null)
+            if (segment != null) {
                 segmentIds.add(segment.id);
+            }
         }
 
         return segmentIds;

Reply via email to