KAFKA-3776: Unify store and downstream caching in streams

This is joint work between dguy and enothereska. The work implements KIP-63. 
Overview of main changes:

- New byte-based cache that acts as a buffer for any persistent store and for 
forwarding changes downstream.
- Forwarding record path changes: previously a record in a task completed 
end-to-end. Now it may be buffered in a processor node while other records 
complete in the task.
- Cleanup and state stores and decoupling of cache from state store and 
forwarding.
- More than 80 new unit and integration tests.

Author: Damian Guy <damian....@gmail.com>
Author: Eno Thereska <eno.there...@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #1752 from enothereska/KAFKA-3776-poc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86aa0eb0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86aa0eb0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86aa0eb0

Branch: refs/heads/trunk
Commit: 86aa0eb0f274c6e44eb190ce250433419e011a67
Parents: 143a33b
Author: Damian Guy <damian....@gmail.com>
Authored: Fri Sep 16 09:58:36 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Fri Sep 16 09:58:36 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |  11 +-
 .../kafka/streams/kstream/JoinWindows.java      |   5 +
 .../kafka/streams/kstream/TimeWindows.java      |   5 +
 .../kafka/streams/kstream/UnlimitedWindows.java |   5 +
 .../apache/kafka/streams/kstream/Windows.java   |   1 +
 .../kstream/internals/CacheFlushListener.java   |  35 ++
 .../internals/ForwardingCacheFlushListener.java |  38 ++
 .../kstream/internals/KGroupedStreamImpl.java   |  12 +-
 .../kstream/internals/KGroupedTableImpl.java    |   1 +
 .../kstream/internals/KStreamAggregate.java     |  10 +-
 .../streams/kstream/internals/KStreamImpl.java  |   2 +-
 .../kstream/internals/KStreamMapValues.java     |   2 +-
 .../kstream/internals/KStreamReduce.java        |   7 +-
 .../internals/KStreamWindowAggregate.java       |  17 +-
 .../kstream/internals/KStreamWindowReduce.java  |  12 +-
 .../kstream/internals/KTableAggregate.java      |  13 +-
 .../streams/kstream/internals/KTableImpl.java   |   3 +-
 .../streams/kstream/internals/KTableReduce.java |  11 +-
 .../streams/kstream/internals/KTableSource.java |   5 +-
 .../kafka/streams/processor/Processor.java      |   2 +-
 .../streams/processor/ProcessorContext.java     |   1 +
 .../streams/processor/TopologyBuilder.java      |  14 +-
 .../processor/internals/AbstractTask.java       |  19 +-
 .../internals/InternalProcessorContext.java     |  49 +++
 .../internals/ProcessorContextImpl.java         | 113 +++--
 .../processor/internals/ProcessorNode.java      |   9 +-
 .../internals/ProcessorRecordContext.java       |  72 +++
 .../internals/ProcessorStateManager.java        |  17 +-
 .../processor/internals/ProcessorTopology.java  |  22 +-
 .../processor/internals/RecordContext.java      |  45 ++
 .../streams/processor/internals/SinkNode.java   |   4 +-
 .../streams/processor/internals/SourceNode.java |   3 +-
 .../processor/internals/StandbyContextImpl.java |  27 +-
 .../processor/internals/StandbyTask.java        |   4 +-
 .../streams/processor/internals/StreamTask.java |  88 ++--
 .../processor/internals/StreamThread.java       |  64 ++-
 .../org/apache/kafka/streams/state/Stores.java  |  22 +-
 .../state/internals/AbstractStoreSupplier.java  |   1 +
 .../state/internals/CachedStateStore.java       |  28 ++
 .../state/internals/CachingKeyValueStore.java   | 219 ++++++++++
 .../state/internals/CachingWindowStore.java     | 170 ++++++++
 .../DelegatingPeekingKeyValueIterator.java      |  73 ++++
 .../DelegatingPeekingWindowIterator.java        |  73 ++++
 .../streams/state/internals/LRUCacheEntry.java  |  92 ++++
 .../MergedSortedCacheKeyValueStoreIterator.java | 109 +++++
 .../MergedSortedCachedWindowStoreIterator.java  | 105 +++++
 .../streams/state/internals/NamedCache.java     | 298 +++++++++++++
 .../internals/PeekingKeyValueIterator.java      |  24 +
 .../state/internals/PeekingWindowIterator.java  |  25 ++
 .../internals/RocksDBKeyValueStoreSupplier.java |  28 +-
 .../streams/state/internals/RocksDBStore.java   | 201 ++-------
 .../state/internals/RocksDBWindowStore.java     |  48 +-
 .../internals/RocksDBWindowStoreSupplier.java   |  25 +-
 .../streams/state/internals/ThreadCache.java    | 311 +++++++++++++
 .../state/internals/ThreadCacheMetrics.java     |  40 ++
 .../state/internals/WindowStoreUtils.java       |   8 +
 .../integration/FanoutIntegrationTest.java      |  16 +
 .../KStreamAggregationDedupIntegrationTest.java | 264 +++++++++++
 .../KStreamAggregationIntegrationTest.java      |  19 +-
 .../KStreamKTableJoinIntegrationTest.java       | 102 +++--
 .../integration/KStreamRepartitionJoinTest.java |  51 ++-
 .../QueryableStateIntegrationTest.java          | 127 ++++--
 .../internals/KGroupedTableImplTest.java        |  46 ++
 .../internals/KStreamKStreamJoinTest.java       |  58 +--
 .../internals/KStreamKStreamLeftJoinTest.java   |  68 +--
 .../internals/KStreamWindowAggregateTest.java   | 119 +++--
 .../kstream/internals/KTableAggregateTest.java  |  98 ++++-
 .../kstream/internals/KTableFilterTest.java     |  18 +-
 .../kstream/internals/KTableForeachTest.java    |   1 +
 .../kstream/internals/KTableImplTest.java       |  10 +
 .../kstream/internals/KTableKTableJoinTest.java |  32 +-
 .../internals/KTableKTableLeftJoinTest.java     |  41 +-
 .../internals/KTableKTableOuterJoinTest.java    |  51 +--
 .../kstream/internals/KTableMapKeysTest.java    |   2 +-
 .../kstream/internals/KTableMapValuesTest.java  |  17 +-
 .../kstream/internals/KTableSourceTest.java     |  13 +-
 .../kafka/streams/perf/SimpleBenchmark.java     |   9 +-
 .../streams/processor/TopologyBuilderTest.java  |   6 +-
 .../internals/ProcessorStateManagerTest.java    |  13 +-
 .../internals/ProcessorTopologyTest.java        |   2 +
 .../internals/PunctuationQueueTest.java         |   4 +-
 .../processor/internals/RecordContextStub.java  |  55 +++
 .../processor/internals/StandbyTaskTest.java    |  20 +-
 .../processor/internals/StreamTaskTest.java     |  14 +-
 .../processor/internals/StreamThreadTest.java   |   2 +-
 .../kafka/streams/smoketest/SmokeTestUtil.java  |   3 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   9 +-
 .../internals/AbstractKeyValueStoreTest.java    |   1 -
 .../internals/CachingKeyValueStoreTest.java     | 151 +++++++
 .../state/internals/CachingWindowStoreTest.java | 166 +++++++
 .../DelegatingPeekingKeyValueIteratorTest.java  |  78 ++++
 .../DelegatingPeekingWindowIteratorTest.java    |  92 ++++
 ...gedSortedCacheKeyValueStoreIteratorTest.java |  57 +++
 ...ergedSortedCacheWindowStoreIteratorTest.java |  97 +++++
 .../streams/state/internals/NamedCacheTest.java | 189 ++++++++
 .../internals/RocksDBKeyValueStoreTest.java     |  56 ++-
 .../state/internals/RocksDBWindowStoreTest.java | 259 ++++++-----
 .../state/internals/StateStoreTestUtils.java    |   1 +
 .../StreamThreadStateStoreProviderTest.java     |   4 +-
 .../state/internals/ThreadCacheTest.java        | 434 +++++++++++++++++++
 .../apache/kafka/test/KStreamTestDriver.java    |  51 ++-
 .../apache/kafka/test/MockProcessorContext.java |  66 ++-
 .../kafka/test/MockProcessorSupplier.java       |   5 +-
 .../java/org/apache/kafka/test/MockReducer.java |  21 +
 .../kafka/test/ProcessorTopologyTestDriver.java |   6 +-
 105 files changed, 4784 insertions(+), 888 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index e972887..23b5287 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -136,6 +136,9 @@ public class StreamsConfig extends AbstractConfig {
     public static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = 
"windowstore.changelog.additional.retention.ms";
     public static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows 
maintainMs to ensure data is not deleted from the log prematurely. Allows for 
clock drift. Default is 1 day";
 
+    /** <code>cache.max.bytes.buffering</code> */
+    public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = 
"cache.max.bytes.buffering";
+    public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number 
of memory bytes to be used for buffering across all threads";
 
     static {
         CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // 
required with no default value
@@ -247,7 +250,13 @@ public class StreamsConfig extends AbstractConfig {
                                         Type.LONG,
                                         24 * 60 * 60 * 1000,
                                         Importance.MEDIUM,
-                                        
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC);
+                                        
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
+                                .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
+                                        Type.LONG,
+                                        10 * 1024 * 1024L,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        CACHE_MAX_BYTES_BUFFERING_DOC);
     }
 
     // this is the list of configs for underlying clients

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 2552148..1ac606e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -103,6 +103,11 @@ public class JoinWindows extends Windows<TimeWindow> {
     }
 
     @Override
+    public long size() {
+        return after + before;
+    }
+
+    @Override
     public final boolean equals(Object o) {
         if (o == this) {
             return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 1ec4628..ef94cf9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -110,6 +110,11 @@ public class TimeWindows extends Windows<TimeWindow> {
     }
 
     @Override
+    public long size() {
+        return size;
+    }
+
+    @Override
     public final boolean equals(Object o) {
         if (o == this) {
             return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 971f3c7..92f9ee9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -70,6 +70,11 @@ public class UnlimitedWindows extends 
Windows<UnlimitedWindow> {
     }
 
     @Override
+    public long size() {
+        return Long.MAX_VALUE;
+    }
+
+    @Override
     public final boolean equals(Object o) {
         if (o == this) {
             return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index d0a5861..f060d39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -79,4 +79,5 @@ public abstract class Windows<W extends Window> {
      */
     public abstract Map<Long, W> windowsFor(long timestamp);
 
+    public abstract long size();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
new file mode 100644
index 0000000..c01ed0f
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+/**
+ * Listen to cache flush events
+ * @param <K>
+ * @param <V>
+ */
+public interface CacheFlushListener<K, V> {
+
+    /**
+     * Called when records are flushed from the {@link ThreadCache}
+     * @param key         key of the entry
+     * @param newValue    current value
+     * @param oldValue    previous value
+     */
+    void apply(final K key, final V newValue, final V oldValue);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
new file mode 100644
index 0000000..1796be9
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
+    private final ProcessorContext context;
+    private final boolean sendOldValues;
+
+    ForwardingCacheFlushListener(final ProcessorContext context, final boolean 
sendOldValues) {
+        this.context = context;
+        this.sendOldValues = sendOldValues;
+    }
+
+    @Override
+    public void apply(final K key, final V newValue, final V oldValue) {
+        if (sendOldValues) {
+            context.forward(key, new Change<>(newValue, oldValue));
+        } else {
+            context.forward(key, new Change<>(newValue, null));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 78e6a2c..9bc66e8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -151,18 +151,18 @@ public class KGroupedStreamImpl<K, V> extends 
AbstractStream<K> implements KGrou
                                                                    final 
Windows<W> windows,
                                                                    final 
String storeName) {
         return storeFactory(aggValSerde, storeName)
-            .windowed(windows.maintainMs(), windows.segments, false)
-            .build();
+                .windowed(windows.size(), windows.maintainMs(), 
windows.segments, false)
+                .build();
 
     }
 
     private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final 
Serde<T> aggValueSerde,
                                                                     final 
String storeName) {
         return Stores.create(storeName)
-            .withKeys(keySerde)
-            .withValues(aggValueSerde)
-            .persistent();
-
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
+                .persistent()
+                .enableCaching();
     }
 
     private <T> KTable<K, T> doAggregate(

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 82a800d..7aa2531 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -105,6 +105,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K> implements KGroup
             .withKeys(keySerde)
             .withValues(aggValueSerde)
             .persistent()
+            .enableCaching()
             .build();
 
         // send the aggregate key-value pairs to the intermediate topic for 
partitioning

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index dc6410d..428c513 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K, K, V, T> {
 
@@ -30,6 +31,7 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
     private final Initializer<T> initializer;
     private final Aggregator<K, V, T> aggregator;
 
+
     private boolean sendOldValues = false;
 
     public KStreamAggregate(String storeName, Initializer<T> initializer, 
Aggregator<K, V, T> aggregator) {
@@ -56,8 +58,8 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
-
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+            ((CachedStateStore) store).setFlushListener(new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
 
@@ -80,12 +82,6 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
 
             // update the store with the new value
             store.put(key, newAgg);
-
-            // send the old / new pair
-            if (sendOldValues)
-                context().forward(key, new Change<>(newAgg, oldAgg));
-            else
-                context().forward(key, new Change<>(newAgg, null));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 4d39b18..b9ed19a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -633,7 +633,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
             .withKeys(keySerde)
             .withValues(valueSerde)
             .persistent()
-            .windowed(windows.maintainMs(), windows.segments, true)
+            .windowed(windows.size(), windows.maintainMs(), windows.segments, 
true)
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 06667e8..66c0f62 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -37,7 +37,7 @@ class KStreamMapValues<K, V, V1> implements 
ProcessorSupplier<K, V> {
 
     private class KStreamMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K key, V value) {
+        public void process(final K key, final V value) {
             V1 newValue = mapper.apply(value);
             context().forward(key, newValue);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index dd5ba45..6d24284 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, 
V, V> {
 
@@ -55,6 +56,7 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
             super.init(context);
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+            ((CachedStateStore) store).setFlushListener(new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
 
@@ -79,11 +81,6 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
             // update the store with the new value
             store.put(key, newAgg);
 
-            // send the old / new pair
-            if (sendOldValues)
-                context().forward(key, new Change<>(newAgg, oldAgg));
-            else
-                context().forward(key, new Change<>(newAgg, null));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 125c7fc..437d304 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 import java.util.Map;
 
@@ -67,6 +68,7 @@ public class KStreamWindowAggregate<K, V, T, W extends 
Window> implements KStrea
             super.init(context);
 
             windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+            ((CachedStateStore) windowStore).setFlushListener(new 
ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
         }
 
         @Override
@@ -91,7 +93,7 @@ public class KStreamWindowAggregate<K, V, T, W extends 
Window> implements KStrea
 
             try (WindowStoreIterator<T> iter = windowStore.fetch(key, 
timeFrom, timeTo)) {
 
-                // for each matching window, try to update the corresponding 
key and send to the downstream
+                // for each matching window, try to update the corresponding 
key
                 while (iter.hasNext()) {
                     KeyValue<Long, T> entry = iter.next();
                     W window = matchedWindows.get(entry.key);
@@ -109,12 +111,6 @@ public class KStreamWindowAggregate<K, V, T, W extends 
Window> implements KStrea
                         // update the store with the new value
                         windowStore.put(key, newAgg, window.start());
 
-                        // forward the aggregated change pair
-                        if (sendOldValues)
-                            context().forward(new Windowed<>(key, window), new 
Change<>(newAgg, oldAgg));
-                        else
-                            context().forward(new Windowed<>(key, window), new 
Change<>(newAgg, null));
-
                         matchedWindows.remove(entry.key);
                     }
                 }
@@ -124,14 +120,7 @@ public class KStreamWindowAggregate<K, V, T, W extends 
Window> implements KStrea
             for (long windowStartMs : matchedWindows.keySet()) {
                 T oldAgg = initializer.apply();
                 T newAgg = aggregator.apply(key, value, oldAgg);
-
                 windowStore.put(key, newAgg, windowStartMs);
-
-                // send the new aggregate pair
-                if (sendOldValues)
-                    context().forward(new Windowed<>(key, 
matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
-                else
-                    context().forward(new Windowed<>(key, 
matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 763ccdd..2a47f72 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 import java.util.Map;
 
@@ -62,8 +63,8 @@ public class KStreamWindowReduce<K, V, W extends Window> 
implements KStreamAggPr
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
-
             windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+            ((CachedStateStore) windowStore).setFlushListener(new 
ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
         }
 
         @Override
@@ -108,12 +109,6 @@ public class KStreamWindowReduce<K, V, W extends Window> 
implements KStreamAggPr
                         // update the store with the new value
                         windowStore.put(key, newAgg, window.start());
 
-                        // forward the aggregated change pair
-                        if (sendOldValues)
-                            context().forward(new Windowed<>(key, window), new 
Change<>(newAgg, oldAgg));
-                        else
-                            context().forward(new Windowed<>(key, window), new 
Change<>(newAgg, null));
-
                         matchedWindows.remove(entry.key);
                     }
                 }
@@ -122,9 +117,6 @@ public class KStreamWindowReduce<K, V, W extends Window> 
implements KStreamAggPr
             // create the new window for the rest of unmatched window that do 
not exist yet
             for (long windowStartMs : matchedWindows.keySet()) {
                 windowStore.put(key, value, windowStartMs);
-
-                // send the new aggregate pair (there will be no old value)
-                context().forward(new Windowed<>(key, 
matchedWindows.get(windowStartMs)), new Change<>(value, null));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 4a7c7c0..3f2ab97 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, 
T> {
 
@@ -57,10 +58,10 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
-
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+            ((CachedStateStore) store).setFlushListener(new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
         /**
@@ -91,13 +92,8 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
 
             // update the store with the new value
             store.put(key, newAgg);
-
-            // send the old / new pair
-            if (sendOldValues)
-                context().forward(key, new Change<>(newAgg, oldAgg));
-            else
-                context().forward(key, new Change<>(newAgg, null));
         }
+
     }
 
     @Override
@@ -128,4 +124,5 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
         }
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6c73b11..ebe00d8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -422,7 +422,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
                                                                                
       keySerde,
                                                                                
       valSerde,
                                                                                
       false,
-                                                                               
       Collections.<String, String>emptyMap());
+                                                                               
       Collections.<String, String>emptyMap(),
+                                                                               
       true);
                 // mark this state as non internal hence it is read directly 
from a user topic
                 topology.addStateStore(storeSupplier, name);
                 source.materialize();

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index bab6bf3..a5457a5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
 
@@ -45,10 +46,10 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
     @Override
     public Processor<K, Change<V>> get() {
-        return new KTableAggregateProcessor();
+        return new KTableReduceProcessor();
     }
 
-    private class KTableAggregateProcessor extends AbstractProcessor<K, 
Change<V>> {
+    private class KTableReduceProcessor extends AbstractProcessor<K, 
Change<V>> {
 
         private KeyValueStore<K, V> store;
 
@@ -58,6 +59,7 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
             super.init(context);
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+            ((CachedStateStore) store).setFlushListener(new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
         /**
@@ -89,11 +91,6 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
             // update the store with the new value
             store.put(key, newAgg);
 
-            // send the old / new pair
-            if (sendOldValues)
-                context().forward(key, new Change<>(newAgg, oldAgg));
-            else
-                context().forward(key, new Change<>(newAgg, null));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 05befed..d8d389f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
@@ -72,6 +73,7 @@ public class KTableSource<K, V> implements 
ProcessorSupplier<K, V> {
         public void init(ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+            ((CachedStateStore) store).setFlushListener(new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
         @Override
@@ -80,10 +82,7 @@ public class KTableSource<K, V> implements 
ProcessorSupplier<K, V> {
             if (key == null)
                 throw new StreamsException("Record key for the source KTable 
from store name " + storeName + " should not be null.");
 
-            V oldValue = sendOldValues ? store.get(key) : null;
             store.put(key, value);
-
-            context().forward(key, new Change<>(value, oldValue));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index 92fcf12..beaace3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -38,7 +38,7 @@ public interface Processor<K, V> {
 
     /**
      * Process the record with the given key and value.
-     * 
+     *
      * @param key the key for the record
      * @param value the value for the record
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index acecf91..d854a85 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -184,4 +184,5 @@ public interface ProcessorContext {
      *
      */
     Map<String, Object> appConfigsWithPrefix(String prefix);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ee61e73..f5fd571 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -767,12 +767,13 @@ public class TopologyBuilder {
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
         Map<String, SinkNode> topicSinkMap = new HashMap<>();
-        Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
+        Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
+        Map<StateStore, ProcessorNode> storeToProcessorNodeMap = new 
HashMap<>();
 
         // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
         for (NodeFactory factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                ProcessorNode node = factory.build();
+                final ProcessorNode node = factory.build();
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
 
@@ -782,7 +783,10 @@ public class TopologyBuilder {
                     }
                     for (String stateStoreName : ((ProcessorNodeFactory) 
factory).stateStoreNames) {
                         if (!stateStoreMap.containsKey(stateStoreName)) {
-                            stateStoreMap.put(stateStoreName, 
stateFactories.get(stateStoreName).supplier);
+                            final StateStoreSupplier supplier = 
stateFactories.get(stateStoreName).supplier;
+                            final StateStore stateStore = supplier.get();
+                            stateStoreMap.put(stateStoreName, stateStore);
+                            storeToProcessorNodeMap.put(stateStore, node);
                         }
                     }
                 } else if (factory instanceof SourceNodeFactory) {
@@ -815,9 +819,9 @@ public class TopologyBuilder {
             }
         }
 
-        return new ProcessorTopology(processorNodes, topicSourceMap, 
topicSinkMap, new ArrayList<>(stateStoreMap.values()), 
sourceStoreToSourceTopic);
+        return new ProcessorTopology(processorNodes, topicSourceMap, 
topicSinkMap, new ArrayList<>(stateStoreMap.values()), 
sourceStoreToSourceTopic, storeToProcessorNodeMap);
     }
-
+    
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8a45dd6..54cbe4e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -23,8 +23,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -40,8 +40,8 @@ public abstract class AbstractTask {
     protected final Consumer consumer;
     protected final ProcessorStateManager stateMgr;
     protected final Set<TopicPartition> partitions;
-    protected ProcessorContext processorContext;
-
+    protected InternalProcessorContext processorContext;
+    protected final ThreadCache cache;
     /**
      * @throws ProcessorStateException if the state manager cannot be created
      */
@@ -52,16 +52,18 @@ public abstract class AbstractTask {
                            Consumer<byte[], byte[]> consumer,
                            Consumer<byte[], byte[]> restoreConsumer,
                            boolean isStandby,
-                           StateDirectory stateDirectory) {
+                           StateDirectory stateDirectory,
+                           final ThreadCache cache) {
         this.id = id;
         this.applicationId = applicationId;
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
         this.consumer = consumer;
+        this.cache = cache;
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(applicationId, id, 
partitions, restoreConsumer, isStandby, stateDirectory, 
topology.sourceStoreToSourceTopic());
+            this.stateMgr = new ProcessorStateManager(applicationId, id, 
partitions, restoreConsumer, isStandby, stateDirectory, 
topology.sourceStoreToSourceTopic(), topology.storeToProcessorNodeMap());
 
         } catch (IOException e) {
             throw new ProcessorStateException("Error while creating the state 
manager", e);
@@ -72,8 +74,7 @@ public abstract class AbstractTask {
         // set initial offset limits
         initializeOffsetLimits();
 
-        for (StateStoreSupplier stateStoreSupplier : 
this.topology.stateStoreSuppliers()) {
-            StateStore store = stateStoreSupplier.get();
+        for (StateStore store : this.topology.stateStores()) {
             store.init(this.processorContext, store);
         }
     }
@@ -98,6 +99,10 @@ public abstract class AbstractTask {
         return processorContext;
     }
 
+    public final ThreadCache cache() {
+        return cache;
+    }
+
     public abstract void commit();
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
new file mode 100644
index 0000000..251ff3f
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+/**
+ * For internal use so we can update the {@link RecordContext} and current
+ * {@link ProcessorNode} when we are forwarding items that have been evicted 
or flushed from
+ * {@link ThreadCache}
+ */
+public interface InternalProcessorContext extends ProcessorContext {
+
+    /**
+     * Returns the current {@link RecordContext}
+     * @return the current {@link RecordContext}
+     */
+    RecordContext recordContext();
+
+    /**
+     * @param recordContext the {@link RecordContext} for the record about to 
be processes
+     */
+    void setRecordContext(RecordContext recordContext);
+
+    /**
+     * @param currentNode the current {@link ProcessorNode}
+     */
+    void setCurrentNode(ProcessorNode currentNode);
+
+    /**
+     * Get the thread-global cache
+     */
+    ThreadCache getCache();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index a38839f..f4d4e83 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -21,15 +21,16 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
+import java.util.List;
 import java.util.Map;
 
-public class ProcessorContextImpl implements ProcessorContext, 
RecordCollector.Supplier {
+public class ProcessorContextImpl implements InternalProcessorContext, 
RecordCollector.Supplier {
 
     public static final String NONEXIST_TOPIC = "__null_topic__";
 
@@ -42,12 +43,10 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
     private final StreamsConfig config;
     private final Serde<?> keySerde;
     private final Serde<?> valSerde;
-
+    private final ThreadCache cache;
     private boolean initialized;
-    private Long timestamp;
-    private String topic;
-    private Long offset;
-    private Integer partition;
+    private RecordContext recordContext;
+    private ProcessorNode currentNode;
 
     @SuppressWarnings("unchecked")
     public ProcessorContextImpl(TaskId id,
@@ -55,7 +54,8 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
                                 StreamsConfig config,
                                 RecordCollector collector,
                                 ProcessorStateManager stateMgr,
-                                StreamsMetrics metrics) {
+                                StreamsMetrics metrics,
+                                final ThreadCache cache) {
         this.id = id;
         this.task = task;
         this.metrics = metrics;
@@ -65,7 +65,7 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
         this.config = config;
         this.keySerde = config.keySerde();
         this.valSerde = config.valueSerde();
-
+        this.cache = cache;
         this.initialized = false;
     }
 
@@ -140,10 +140,22 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
         return stateMgr.getStore(name);
     }
 
+    @Override
+    public ThreadCache getCache() {
+        return cache;
+    }
 
+    /**
+     * @throws IllegalStateException if the task's record is null
+     */
     @Override
-    public synchronized String topic() {
-        if (topic == null || topic.equals(NONEXIST_TOPIC))
+    public String topic() {
+        if (recordContext == null)
+            throw new IllegalStateException("This should not happen as topic() 
should only be called while a record is processed");
+
+        String topic = recordContext.topic();
+
+        if (topic.equals(NONEXIST_TOPIC))
             return null;
         else
             return topic;
@@ -153,48 +165,77 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
      * @throws IllegalStateException if partition is null
      */
     @Override
-    public synchronized int partition() {
-        if (partition == null) {
+    public int partition() {
+        if (recordContext == null)
             throw new IllegalStateException("This should not happen as 
partition() should only be called while a record is processed");
-        }
-        return partition;
+
+        return recordContext.partition();
     }
 
     /**
      * @throws IllegalStateException if offset is null
      */
     @Override
-    public synchronized long offset() {
-        if (offset == null) {
+    public long offset() {
+        if (recordContext == null)
             throw new IllegalStateException("This should not happen as 
offset() should only be called while a record is processed");
-        }
-        return offset;
+
+        return recordContext.offset();
     }
 
     /**
      * @throws IllegalStateException if timestamp is null
      */
     @Override
-    public synchronized long timestamp() {
-        if (timestamp == null) {
-            throw new IllegalStateException("This should not happen as 
timestamp should be set during record processing");
-        }
-        return timestamp;
+    public long timestamp() {
+        if (recordContext == null)
+            throw new IllegalStateException("This should not happen as 
timestamp() should only be called while a record is processed");
+
+        return recordContext.timestamp();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(K key, V value) {
-        task.forward(key, value);
+        ProcessorNode previousNode = currentNode;
+        try {
+            for (ProcessorNode child : (List<ProcessorNode>) 
currentNode.children()) {
+                currentNode = child;
+                child.process(key, value);
+            }
+        } finally {
+            currentNode = previousNode;
+        }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
-        task.forward(key, value, childIndex);
+        ProcessorNode previousNode = currentNode;
+        final ProcessorNode child = (ProcessorNode<K, V>) 
currentNode.children().get(childIndex);
+        currentNode = child;
+        try {
+            child.process(key, value);
+        } finally {
+            currentNode = previousNode;
+        }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(K key, V value, String childName) {
-        task.forward(key, value, childName);
+        for (ProcessorNode child : (List<ProcessorNode<K, V>>) 
currentNode.children()) {
+            if (child.name().equals(childName)) {
+                ProcessorNode previousNode = currentNode;
+                currentNode = child;
+                try {
+                    child.process(key, value);
+                    return;
+                } finally {
+                    currentNode = previousNode;
+                }
+            }
+        }
     }
 
     @Override
@@ -217,10 +258,18 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
         return config.originalsWithPrefix(prefix);
     }
 
-    public synchronized void update(final StampedRecord record) {
-        this.timestamp = record.timestamp;
-        this.partition = record.partition();
-        this.offset = record.offset();
-        this.topic = record.topic();
+    @Override
+    public void setRecordContext(final RecordContext recordContext) {
+        this.recordContext = recordContext;
+    }
+
+    @Override
+    public RecordContext recordContext() {
+        return this.recordContext;
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode currentNode) {
+        this.currentNode = currentNode;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 64ca032..c05702b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -44,6 +44,7 @@ public class ProcessorNode<K, V> {
         this.stateStores = stateStores;
     }
 
+
     public final String name() {
         return name;
     }
@@ -64,14 +65,14 @@ public class ProcessorNode<K, V> {
         processor.init(context);
     }
 
-    public void process(K key, V value) {
-        processor.process(key, value);
-    }
-
     public void close() {
         processor.close();
     }
 
+    public void process(final K key, final V value) {
+        processor.process(key, value);
+    }
+
     /**
      * @return a string representation of this node, useful for debugging.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
new file mode 100644
index 0000000..55452ad
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Objects;
+
+public class ProcessorRecordContext implements RecordContext {
+
+    private final long timestamp;
+    private final long offset;
+    private final String topic;
+    private final int partition;
+
+    public ProcessorRecordContext(final long timestamp,
+                                  final long offset,
+                                  final int partition,
+                                  final String topic) {
+
+        this.timestamp = timestamp;
+        this.offset = offset;
+        this.topic = topic;
+        this.partition = partition;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String topic() {
+        return topic;
+    }
+
+    @Override
+    public int partition() {
+        return partition;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final ProcessorRecordContext that = (ProcessorRecordContext) o;
+        return timestamp == that.timestamp &&
+                offset == that.offset &&
+                partition == that.partition &&
+                Objects.equals(topic, that.topic);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timestamp, offset, topic, partition);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 123e475..5e5eaa9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -64,21 +65,24 @@ public class ProcessorStateManager {
     private final Map<String, String> sourceStoreToSourceTopic;
     private final TaskId taskId;
     private final StateDirectory stateDirectory;
+    private final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap;
 
     /**
      * @throws IOException if any error happens while creating or locking the 
state directory
      */
     public ProcessorStateManager(String applicationId, TaskId taskId, 
Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, 
boolean isStandby,
-        StateDirectory stateDirectory, final Map<String, String> 
sourceStoreToSourceTopic) throws IOException {
+                                 StateDirectory stateDirectory, final 
Map<String, String> sourceStoreToSourceTopic,
+                                 final Map<StateStore, ProcessorNode> 
stateStoreProcessorNodeMap) throws IOException {
         this.applicationId = applicationId;
         this.defaultPartition = taskId.partition;
         this.taskId = taskId;
         this.stateDirectory = stateDirectory;
+        this.stateStoreProcessorNodeMap = stateStoreProcessorNodeMap;
         this.partitionForTopic = new HashMap<>();
         for (TopicPartition source : sources) {
             this.partitionForTopic.put(source.topic(), source);
         }
-        this.stores = new HashMap<>();
+        this.stores = new LinkedHashMap<>();
         this.loggingEnabled = new HashSet<>();
         this.restoreConsumer = restoreConsumer;
         this.restoredOffsets = new HashMap<>();
@@ -288,11 +292,16 @@ public class ProcessorStateManager {
         return stores.get(name);
     }
 
-    public void flush() {
+    public void flush(final InternalProcessorContext context) {
         if (!this.stores.isEmpty()) {
             log.debug("task [{}] Flushing stores.", taskId);
-            for (StateStore store : this.stores.values())
+            for (StateStore store : this.stores.values()) {
+                final ProcessorNode processorNode = 
stateStoreProcessorNodeMap.get(store);
+                if (processorNode != null) {
+                    context.setCurrentNode(processorNode);
+                }
                 store.flush();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 221d152..04c0261 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -16,7 +16,7 @@
  */
 
 package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -28,18 +28,22 @@ public class ProcessorTopology {
     private final List<ProcessorNode> processorNodes;
     private final Map<String, SourceNode> sourceByTopics;
     private final Map<String, SinkNode> sinkByTopics;
-    private final List<StateStoreSupplier> stateStoreSuppliers;
+    private final List<StateStore> stateStores;
     private final Map<String, String> sourceStoreToSourceTopic;
+    private final Map<StateStore, ProcessorNode> storeToProcessorNodeMap;
+
     public ProcessorTopology(List<ProcessorNode> processorNodes,
                              Map<String, SourceNode> sourceByTopics,
                              Map<String, SinkNode> sinkByTopics,
-                             List<StateStoreSupplier> stateStoreSuppliers,
-                             Map<String, String> sourceStoreToSourceTopic) {
+                             List<StateStore> stateStores,
+                             Map<String, String> sourceStoreToSourceTopic,
+                             Map<StateStore, ProcessorNode> 
storeToProcessorNodeMap) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
         this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
         this.sinkByTopics   = Collections.unmodifiableMap(sinkByTopics);
-        this.stateStoreSuppliers = 
Collections.unmodifiableList(stateStoreSuppliers);
+        this.stateStores = Collections.unmodifiableList(stateStores);
         this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
+        this.storeToProcessorNodeMap = 
Collections.unmodifiableMap(storeToProcessorNodeMap);
     }
 
     public Set<String> sourceTopics() {
@@ -70,14 +74,18 @@ public class ProcessorTopology {
         return processorNodes;
     }
 
-    public List<StateStoreSupplier> stateStoreSuppliers() {
-        return stateStoreSuppliers;
+    public List<StateStore> stateStores() {
+        return stateStores;
     }
 
     public Map<String, String> sourceStoreToSourceTopic() {
         return sourceStoreToSourceTopic;
     }
 
+    public Map<StateStore, ProcessorNode> storeToProcessorNodeMap() {
+        return storeToProcessorNodeMap;
+    }
+
     private String childrenToString(List<ProcessorNode<?, ?>> children) {
         if (children == null || children.isEmpty()) {
             return "";

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
new file mode 100644
index 0000000..f37f3e9
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.Processor;
+
+/**
+ * The context associated with the current record being processed by
+ * an {@link Processor}
+ */
+public interface RecordContext {
+    /**
+     * @return The offset of the original record received from Kafka
+     */
+    long offset();
+
+    /**
+     * @return The timestamp extracted from the record received from Kafka
+     */
+    long timestamp();
+
+    /**
+     * @return The topic the record was received on
+     */
+    String topic();
+
+    /**
+     * @return The partition the record was received on
+     */
+    int partition();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 6907858..2b5692d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -65,9 +65,9 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     }
 
+
     @Override
-    public void process(K key, V value) {
-        // send to all the registered topics
+    public void process(final K key, final V value) {
         RecordCollector collector = ((RecordCollector.Supplier) 
context).recordCollector();
         collector.send(new ProducerRecord<>(topic, null, context.timestamp(), 
key, value), keySerializer, valSerializer, partitioner);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 90da1de..4bc3a53 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -60,8 +60,9 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
             ((ChangedDeserializer) 
this.valDeserializer).setInner(context.valueSerde().deserializer());
     }
 
+
     @Override
-    public void process(K key, V value) {
+    public void process(final K key, final V value) {
         context.forward(key, value);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 039ab66..563dbce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,15 +20,14 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-
+import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.io.File;
 import java.util.Map;
 
-public class StandbyContextImpl implements ProcessorContext, 
RecordCollector.Supplier {
+public class StandbyContextImpl implements InternalProcessorContext, 
RecordCollector.Supplier {
 
     private final TaskId id;
     private final String applicationId;
@@ -38,6 +37,7 @@ public class StandbyContextImpl implements ProcessorContext, 
RecordCollector.Sup
     private final StreamsConfig config;
     private final Serde<?> keySerde;
     private final Serde<?> valSerde;
+    private final ThreadCache zeroSizedCache = new ThreadCache(0);
 
     private boolean initialized;
 
@@ -120,6 +120,11 @@ public class StandbyContextImpl implements 
ProcessorContext, RecordCollector.Sup
         throw new UnsupportedOperationException("this should not happen: 
getStateStore() not supported in standby tasks.");
     }
 
+    @Override
+    public ThreadCache getCache() {
+        return zeroSizedCache;
+    }
+
     /**
      * @throws UnsupportedOperationException
      */
@@ -201,4 +206,20 @@ public class StandbyContextImpl implements 
ProcessorContext, RecordCollector.Sup
     public Map<String, Object> appConfigsWithPrefix(String prefix) {
         return config.originalsWithPrefix(prefix);
     }
+
+    @Override
+    public RecordContext recordContext() {
+        throw new UnsupportedOperationException("this should not happen: 
recordContext not supported in standby tasks.");
+    }
+
+    @Override
+    public void setRecordContext(final RecordContext recordContext) {
+        throw new UnsupportedOperationException("this should not happen: 
setRecordContext not supported in standby tasks.");
+    }
+
+
+    @Override
+    public void setCurrentNode(final ProcessorNode currentNode) {
+        // no-op. can't throw as this is called on commit when the StateStores 
get flushed.
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index a22bea9..384a1a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -59,7 +59,7 @@ public class StandbyTask extends AbstractTask {
                        Consumer<byte[], byte[]> restoreConsumer,
                        StreamsConfig config,
                        StreamsMetrics metrics, final StateDirectory 
stateDirectory) {
-        super(id, applicationId, partitions, topology, consumer, 
restoreConsumer, true, stateDirectory);
+        super(id, applicationId, partitions, topology, consumer, 
restoreConsumer, true, stateDirectory, null);
 
         log.info("task [{}] Creating processorContext", id());
 
@@ -92,7 +92,7 @@ public class StandbyTask extends AbstractTask {
 
     public void commit() {
         log.debug("task [{}] flushing", id());
-        stateMgr.flush();
+        stateMgr.flush(processorContext);
 
         // reinitialize offset limits
         initializeOffsetLimits();

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 476ec2e..2d40d88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -26,12 +26,12 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static java.util.Collections.singleton;
@@ -56,7 +56,6 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
 
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
-    private StampedRecord currRecord = null;
     private ProcessorNode currNode = null;
 
     private boolean requiresPoll = true;
@@ -83,8 +82,9 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
                       Consumer<byte[], byte[]> restoreConsumer,
                       StreamsConfig config,
                       StreamsMetrics metrics,
-                      StateDirectory stateDirectory) {
-        super(id, applicationId, partitions, topology, consumer, 
restoreConsumer, false, stateDirectory);
+                      StateDirectory stateDirectory,
+                      ThreadCache cache) {
+        super(id, applicationId, partitions, topology, consumer, 
restoreConsumer, false, stateDirectory, cache);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = 
config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
@@ -110,7 +110,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         log.info("task [{}] Creating restoration consumer client", id());
 
         // initialize the topology with its own context
-        this.processorContext = new ProcessorContextImpl(id, this, config, 
recordCollector, stateMgr, metrics);
+        this.processorContext = new ProcessorContextImpl(id, this, config, 
recordCollector, stateMgr, metrics, cache);
 
         // initialize the state stores
         initializeStateStores();
@@ -165,19 +165,18 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
 
         try {
             // process the record by passing to the source node of the topology
-            this.currRecord = record;
             this.currNode = recordInfo.node();
             TopicPartition partition = recordInfo.partition();
 
-            log.debug("task [{}] Start processing one record [{}]", id(), 
currRecord);
+            log.debug("task [{}] Start processing one record [{}]", id(), 
record);
+            final ProcessorRecordContext recordContext = 
createRecordContext(record);
+            updateProcessorContext(recordContext, currNode);
+            this.currNode.process(record.key(), record.value());
 
-            updateContext(currRecord);
-            this.currNode.process(currRecord.key(), currRecord.value());
-
-            log.debug("task [{}] Completed processing one record [{}]", id(), 
currRecord);
+            log.debug("task [{}] Completed processing one record [{}]", id(), 
record);
 
             // update the consumed offset map after processing is done
-            consumedOffsets.put(partition, currRecord.offset());
+            consumedOffsets.put(partition, record.offset());
             commitOffsetNeeded = true;
 
             // after processing this record, if its partition queue's buffered 
size has been
@@ -191,13 +190,18 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
                 requiresPoll = true;
             }
         } finally {
-            this.currRecord = null;
+            processorContext.setCurrentNode(null);
             this.currNode = null;
         }
 
         return partitionGroup.numBuffered();
     }
 
+    private void updateProcessorContext(final ProcessorRecordContext 
recordContext, final ProcessorNode currNode) {
+        processorContext.setRecordContext(recordContext);
+        processorContext.setCurrentNode(currNode);
+    }
+
     public boolean requiresPoll() {
         return requiresPoll;
     }
@@ -226,23 +230,16 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
             throw new IllegalStateException(String.format("task [%s] Current 
node is not null", id()));
 
         currNode = node;
-        currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
-        updateContext(currRecord);
+        final StampedRecord stampedRecord = new StampedRecord(DUMMY_RECORD, 
timestamp);
+        updateProcessorContext(createRecordContext(stampedRecord), node);
         try {
             node.processor().punctuate(timestamp);
         } finally {
+            processorContext.setCurrentNode(null);
             currNode = null;
-            currRecord = null;
         }
     }
 
-    private void updateContext(final StampedRecord record) {
-        ((ProcessorContextImpl) processorContext).update(record);
-    }
-
-    public StampedRecord record() {
-        return this.currRecord;
-    }
 
     public ProcessorNode node() {
         return this.currNode;
@@ -253,7 +250,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
      */
     public void commit() {
         // 1) flush local state
-        stateMgr.flush();
+        stateMgr.flush(processorContext);
 
         // 2) flush produced records in the downstream and change logs of 
local states
         recordCollector.flush();
@@ -338,45 +335,8 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         return new RecordQueue(partition, source);
     }
 
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value) {
-        ProcessorNode thisNode = currNode;
-        try {
-            for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) 
thisNode.children()) {
-                currNode = childNode;
-                childNode.process(key, value);
-            }
-        } finally {
-            currNode = thisNode;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value, int childIndex) {
-        ProcessorNode thisNode = currNode;
-        ProcessorNode childNode = (ProcessorNode<K, V>) 
thisNode.children().get(childIndex);
-        currNode = childNode;
-        try {
-            childNode.process(key, value);
-        } finally {
-            currNode = thisNode;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value, String childName) {
-        ProcessorNode thisNode = currNode;
-        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) 
thisNode.children()) {
-            if (childNode.name().equals(childName)) {
-                currNode = childNode;
-                try {
-                    childNode.process(key, value);
-                } finally {
-                    currNode = thisNode;
-                }
-                break;
-            }
-        }
+    private ProcessorRecordContext createRecordContext(final StampedRecord 
currRecord) {
+        return new ProcessorRecordContext(currRecord.timestamp, 
currRecord.offset(), currRecord.partition(), currRecord.topic());
     }
 
     /**
@@ -387,4 +347,6 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
     public String toString() {
         return super.toString();
     }
+
+
 }

Reply via email to