http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 107a5e4..eb16bba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 
@@ -31,27 +33,40 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
+
 public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, 
V> {
 
     private final long retentionPeriod;
     private final boolean retainDuplicates;
     private final int numSegments;
+    private final long windowSize;
+    private final boolean enableCaching;
 
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
boolean logged, Map<String, String> logConfig) {
-        this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, 
valueSerde, null, logged, logConfig);
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
long windowSize, boolean logged, Map<String, String> logConfig, boolean 
enableCaching) {
+        this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, 
valueSerde, null, windowSize, logged, logConfig, enableCaching);
     }
 
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
Time time, boolean logged, Map<String, String> logConfig) {
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
Time time, long windowSize, boolean logged, Map<String, String> logConfig, 
boolean enableCaching) {
         super(name, keySerde, valueSerde, time, logged, logConfig);
         this.retentionPeriod = retentionPeriod;
         this.retainDuplicates = retainDuplicates;
         this.numSegments = numSegments;
+        this.windowSize = windowSize;
+        this.enableCaching = enableCaching;
+    }
+
+    public String name() {
+        return name;
     }
 
     public StateStore get() {
-        RocksDBWindowStore<K, V> store = new RocksDBWindowStore<>(name, 
retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
+        if (!enableCaching) {
+            final RocksDBWindowStore<K, V> rocksDbStore = new 
RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, 
keySerde, valueSerde);
+            return new MeteredWindowStore<>(logged ? 
rocksDbStore.enableLogging() : rocksDbStore, "rocksdb-window", time);
+        }
 
-        return new MeteredWindowStore<>(logged ? store.enableLogging() : 
store, "rocksdb-window", time);
+        final RocksDBWindowStore<Bytes, byte[]> store = new 
RocksDBWindowStore<>(name, retentionPeriod, numSegments, false, Serdes.Bytes(), 
Serdes.ByteArray());
+        return new CachingWindowStore<>(new MeteredWindowStore<>(logged ? 
store.enableLogging() : store, "rocksdb-window", time), keySerde, valueSerde, 
windowSize);
     }
 
     public long retentionPeriod() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
new file mode 100644
index 0000000..e24dc7a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * An in-memory LRU cache store similar to {@link MemoryLRUCache} but 
byte-based, not
+ * record based
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class ThreadCache {
+    private static final Logger log = 
LoggerFactory.getLogger(ThreadCache.class);
+    private final String name;
+    private final long maxCacheSizeBytes;
+    private final Map<String, NamedCache> caches = new HashMap<>();
+    private final ThreadCacheMetrics metrics;
+
+    // internal stats
+    private long numPuts = 0;
+    private long numGets = 0;
+    private long numEvicts = 0;
+    private long numFlushes = 0;
+
+    public interface DirtyEntryFlushListener {
+        void apply(final List<DirtyEntry> dirty);
+    }
+
+    public ThreadCache(long maxCacheSizeBytes) {
+        this(null, maxCacheSizeBytes, null);
+    }
+
+    public ThreadCache(final String name, long maxCacheSizeBytes, final 
ThreadCacheMetrics metrics) {
+        this.name = name;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        this.metrics = metrics != null ? metrics : new 
NullThreadCacheMetrics();
+    }
+
+    public long puts() {
+        return numPuts;
+    }
+
+    public long gets() {
+        return numGets;
+    }
+
+    public long evicts() {
+        return numEvicts;
+    }
+
+    public long flushes() {
+        return numFlushes;
+    }
+
+    /**
+     * Add a listener that is called each time an entry is evicted from the 
cache or an explicit flush is called
+     *
+     * @param namespace
+     * @param listener
+     */
+    public void addDirtyEntryFlushListener(final String namespace, 
DirtyEntryFlushListener listener) {
+        final NamedCache cache = getOrCreateCache(namespace);
+        cache.setListener(listener);
+    }
+
+    public void flush(final String namespace) {
+        numFlushes++;
+
+        final NamedCache cache = getCache(namespace);
+        if (cache == null) {
+            return;
+        }
+        cache.flush();
+
+        log.debug("Thread {} cache stats on flush: #puts={}, #gets={}, 
#evicts={}, #flushes={}",
+            name, puts(), gets(), evicts(), flushes());
+    }
+
+    public LRUCacheEntry get(final String namespace, byte[] key) {
+        numGets++;
+
+        final NamedCache cache = getCache(namespace);
+        if (cache == null) {
+            return null;
+        }
+        return cache.get(Bytes.wrap(key));
+    }
+
+    public void put(final String namespace, byte[] key, LRUCacheEntry value) {
+        numPuts++;
+
+        final NamedCache cache = getOrCreateCache(namespace);
+        cache.put(Bytes.wrap(key), value);
+        maybeEvict(namespace);
+    }
+
+    public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, 
LRUCacheEntry value) {
+        final NamedCache cache = getOrCreateCache(namespace);
+        return cache.putIfAbsent(Bytes.wrap(key), value);
+    }
+
+    public void putAll(final String namespace, final List<KeyValue<byte[], 
LRUCacheEntry>> entries) {
+        final NamedCache cache = getOrCreateCache(namespace);
+        cache.putAll(entries);
+    }
+
+    public LRUCacheEntry delete(final String namespace, final byte[] key) {
+        final NamedCache cache = getCache(namespace);
+        if (cache == null) {
+            return null;
+        }
+
+        return cache.delete(Bytes.wrap(key));
+    }
+
+    public MemoryLRUCacheBytesIterator range(final String namespace, final 
byte[] from, final byte[] to) {
+        final NamedCache cache = getCache(namespace);
+        if (cache == null) {
+            return new 
MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new 
NamedCache(namespace));
+        }
+        return new MemoryLRUCacheBytesIterator(cache.keyRange(cacheKey(from), 
cacheKey(to)), cache);
+    }
+
+    public MemoryLRUCacheBytesIterator all(final String namespace) {
+        final NamedCache cache = getCache(namespace);
+        if (cache == null) {
+            return new 
MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new 
NamedCache(namespace));
+        }
+        return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
+    }
+
+
+    public long size() {
+        long size = 0;
+        for (NamedCache cache : caches.values()) {
+            size += cache.size();
+            if (isOverflowing(size)) {
+                return Long.MAX_VALUE;
+            }
+        }
+
+        if (isOverflowing(size)) {
+            return Long.MAX_VALUE;
+        }
+        return size;
+    }
+
+    private boolean isOverflowing(final long size) {
+        return size < 0;
+    }
+
+    long sizeBytes() {
+        long sizeInBytes = 0;
+        for (final NamedCache namedCache : caches.values()) {
+            sizeInBytes += namedCache.sizeInBytes();
+        }
+        return sizeInBytes;
+    }
+
+    private void maybeEvict(final String namespace) {
+        while (sizeBytes() > maxCacheSizeBytes) {
+            final NamedCache cache = getOrCreateCache(namespace);
+            cache.evict();
+
+            numEvicts++;
+        }
+    }
+
+    private synchronized NamedCache getCache(final String namespace) {
+        return caches.get(namespace);
+    }
+
+    private synchronized NamedCache getOrCreateCache(final String name) {
+        NamedCache cache = caches.get(name);
+        if (cache == null) {
+            cache = new NamedCache(name, this.metrics);
+            caches.put(name, cache);
+        }
+        return cache;
+    }
+
+    private Bytes cacheKey(final byte[] keyBytes) {
+        return Bytes.wrap(keyBytes);
+    }
+
+
+    static class MemoryLRUCacheBytesIterator implements 
PeekingKeyValueIterator<byte[], LRUCacheEntry> {
+        private final Iterator<Bytes> keys;
+        private final NamedCache cache;
+        private KeyValue<byte[], LRUCacheEntry> nextEntry;
+
+        MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final 
NamedCache cache) {
+            this.keys = keys;
+            this.cache = cache;
+        }
+
+        public byte[] peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return nextEntry.key;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextEntry != null) {
+                return true;
+            }
+
+            while (keys.hasNext() && nextEntry == null) {
+                internalNext();
+            }
+
+            return nextEntry != null;
+        }
+
+        @Override
+        public KeyValue<byte[], LRUCacheEntry> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            final KeyValue<byte[], LRUCacheEntry> result = nextEntry;
+            nextEntry = null;
+            return result;
+        }
+
+        private void internalNext() {
+            Bytes cacheKey = keys.next();
+            final LRUCacheEntry entry = cache.get(cacheKey);
+            if (entry == null) {
+                return;
+            }
+
+            nextEntry = new KeyValue<>(cacheKey.get(), entry);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove not supported by 
MemoryLRUCacheBytesIterator");
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+
+    public static class DirtyEntry {
+        private final Bytes key;
+        private final byte[] newValue;
+        private final RecordContext recordContext;
+
+        public DirtyEntry(final Bytes key, final byte[] newValue, final 
RecordContext recordContext) {
+            this.key = key;
+            this.newValue = newValue;
+            this.recordContext = recordContext;
+        }
+
+        public Bytes key() {
+            return key;
+        }
+
+        public byte[] newValue() {
+            return newValue;
+        }
+
+        public RecordContext recordContext() {
+            return recordContext;
+        }
+    }
+
+    public static class NullThreadCacheMetrics implements ThreadCacheMetrics {
+        @Override
+        public Sensor addCacheSensor(String entityName, String operationName, 
String... tags) {
+            return null;
+        }
+
+        @Override
+        public void recordCacheSensor(Sensor sensor, double value) {
+            // do nothing
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java
new file mode 100644
index 0000000..cad2697
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+/**
+ * The Kafka Streams metrics interface for adding metric sensors and 
collecting metric values.
+ */
+public interface ThreadCacheMetrics {
+
+    /**
+     * Add the hit ratio sensor.
+     * @param entityName Name of the entity, could be the name of the cache 
instance, etc.
+     * @param operationName Name of the operation, could be "hit ratio".
+     * @param tags Additional tags of the sensor.
+     * @return The added sensor.
+     */
+    Sensor addCacheSensor(String entityName, String operationName, String... 
tags);
+
+    /**
+     * Record the given value of the sensor.
+     */
+    void recordCacheSensor(Sensor sensor, double value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index 41cd3f3..1ea6bef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -56,6 +56,14 @@ public class WindowStoreUtils {
         return serdes.keyFrom(bytes);
     }
 
+    public static Bytes bytesKeyFromBinaryKey(byte[] binaryKey) {
+        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - 
SEQNUM_SIZE];
+
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+
+        return Bytes.wrap(bytes);
+    }
+
     public static long timestampFromBinaryKey(byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 
TIMESTAMP_SIZE - SEQNUM_SIZE);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index efc427a..a5fb076 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -35,6 +35,10 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -64,6 +68,7 @@ import static org.junit.Assert.assertThat;
  * }
  * </pre>
  */
+@RunWith(Parameterized.class)
 public class FanoutIntegrationTest {
     private static final int NUM_BROKERS = 1;
     @ClassRule
@@ -80,6 +85,16 @@ public class FanoutIntegrationTest {
         CLUSTER.createTopic(OUTPUT_TOPIC_C);
     }
 
+
+    @Parameter
+    public long cacheSizeBytes;
+
+    //Single parameter, use Object[]
+    @Parameters
+    public static Object[] data() {
+        return new Object[] {0, 10 * 1024 * 1024L};
+    }
+
     @Test
     public void shouldFanoutTheInput() throws Exception {
         final List<String> inputValues = Arrays.asList("Hello", "World");
@@ -101,6 +116,7 @@ public class FanoutIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
cacheSizeBytes);
 
         final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
         final KStream<byte[], String> stream2 = stream1.mapValues(

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
new file mode 100644
index 0000000..ab08dbe
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> 
Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See
+ * the License for the specific language governing permissions and limitations 
under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Similar to KStreamAggregationIntegrationTest but with dedupping enabled
+ * by virtue of having a large commit interval
+ */
+public class KStreamAggregationDedupIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER =
+        new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private static volatile int testNo = 0;
+    private KStreamBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String outputTopic;
+    private KGroupedStream<String, String> groupedStream;
+    private Reducer<String> reducer;
+    private KStream<Integer, String> stream;
+
+
+    @Before
+    public void before() {
+        testNo++;
+        builder = new KStreamBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        String applicationId = "kgrouped-stream-test-" +
+            testNo;
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
2000);
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 
1024 * 1024L);
+
+        KeyValueMapper<Integer, String, String> mapper = 
MockKeyValueMapper.<Integer, String>SelectValueMapper();
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), 
streamOneInput);
+        groupedStream = stream
+            .groupBy(
+                mapper,
+                Serdes.String(),
+                Serdes.String());
+
+        reducer = new Reducer<String>() {
+            @Override
+            public String apply(String value1, String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+
+    @Test
+    public void shouldReduce() throws Exception {
+        produceMessages(System.currentTimeMillis());
+        groupedStream
+            .reduce(reducer, "reduce-by-key")
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, String>> results = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 5);
+
+        Collections.sort(results, new Comparator<KeyValue<String, String>>() {
+            @Override
+            public int compare(KeyValue<String, String> o1, KeyValue<String, 
String> o2) {
+                return KStreamAggregationDedupIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("A", "A:A"),
+            KeyValue.pair("B", "B:B"),
+            KeyValue.pair("C", "C:C"),
+            KeyValue.pair("D", "D:D"),
+            KeyValue.pair("E", "E:E"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <K extends Comparable, V extends Comparable> int 
compare(final KeyValue<K, V> o1,
+                                                                            
final KeyValue<K, V> o2) {
+        final int keyComparison = o1.key.compareTo(o2.key);
+        if (keyComparison == 0) {
+            return o1.value.compareTo(o2.value);
+        }
+        return keyComparison;
+    }
+
+    @Test
+    public void shouldReduceWindowed() throws Exception {
+        long firstBatchTimestamp = System.currentTimeMillis() - 1000;
+        produceMessages(firstBatchTimestamp);
+        long secondBatchTimestamp = System.currentTimeMillis();
+        produceMessages(secondBatchTimestamp);
+        produceMessages(secondBatchTimestamp);
+
+        groupedStream
+            .reduce(reducer, TimeWindows.of(500L), "reduce-time-windows")
+            .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
+                @Override
+                public String apply(Windowed<String> windowedKey, String 
value) {
+                    return windowedKey.key() + "@" + 
windowedKey.window().start();
+                }
+            })
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, String>> windowedOutput = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 10);
+
+        Comparator<KeyValue<String, String>>
+            comparator =
+            new Comparator<KeyValue<String, String>>() {
+                @Override
+                public int compare(final KeyValue<String, String> o1,
+                                   final KeyValue<String, String> o2) {
+                    return KStreamAggregationDedupIntegrationTest.compare(o1, 
o2);
+                }
+            };
+
+        Collections.sort(windowedOutput, comparator);
+        long firstBatchWindow = firstBatchTimestamp / 500 * 500;
+        long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+
+        assertThat(windowedOutput, is(
+            Arrays.asList(
+                new KeyValue<>("A@" + firstBatchWindow, "A"),
+                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
+                new KeyValue<>("B@" + firstBatchWindow, "B"),
+                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
+                new KeyValue<>("C@" + firstBatchWindow, "C"),
+                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
+                new KeyValue<>("D@" + firstBatchWindow, "D"),
+                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
+                new KeyValue<>("E@" + firstBatchWindow, "E"),
+                new KeyValue<>("E@" + secondBatchWindow, "E:E")
+            )
+        ));
+    }
+
+
+
+    private void produceMessages(long timestamp)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            streamOneInput,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, "C"),
+                new KeyValue<>(4, "D"),
+                new KeyValue<>(5, "E")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            timestamp);
+    }
+
+
+    private void createTopics() {
+        streamOneInput = "stream-one-" + testNo;
+        outputTopic = "output-" + testNo;
+        CLUSTER.createTopic(streamOneInput, 3, 1);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private void startStreams() {
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+    }
+
+
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+                                                            keyDeserializer,
+                                                        final Deserializer<V>
+                                                            valueDeserializer,
+                                                        final int numMessages)
+        throws InterruptedException {
+        final Properties consumerProperties = new Properties();
+        consumerProperties
+            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"kgroupedstream-test-" +
+            testNo);
+        
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            keyDeserializer.getClass().getName());
+        
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            valueDeserializer.getClass().getName());
+        return 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
+            outputTopic,
+            numMessages, 60 * 1000);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 6da2a95..e5560c1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -43,6 +43,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -55,11 +59,14 @@ import java.util.concurrent.ExecutionException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
+@RunWith(Parameterized.class)
 public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER =
+        new EmbeddedKafkaCluster(NUM_BROKERS);
+
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
     private KStreamBuilder builder;
@@ -73,6 +80,14 @@ public class KStreamAggregationIntegrationTest {
     private Aggregator<String, String, Integer> aggregator;
     private KStream<Integer, String> stream;
 
+    @Parameter
+    public long cacheSizeBytes;
+
+    //Single parameter, use Object[]
+    @Parameters
+    public static Object[] data() {
+        return new Object[] {0, 10 * 1024 * 1024L};
+    }
 
     @Before
     public void before() {
@@ -87,6 +102,8 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
cacheSizeBytes);
 
         final KeyValueMapper<Integer, String, String> mapper = 
MockKeyValueMapper.SelectValueMapper();
         stream = builder.stream(Serdes.Integer(), Serdes.String(), 
streamOneInput);

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 4a13482..02beee3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -36,10 +36,16 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
@@ -51,22 +57,62 @@ import static org.junit.Assert.assertThat;
  * End-to-end integration test that demonstrates how to perform a join between 
a KStream and a
  * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful 
computation.
  */
+
+@RunWith(Parameterized.class)
 public class KStreamKTableJoinIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
     private final MockTime mockTime = CLUSTER.time;
-    private static final String USER_CLICKS_TOPIC = "user-clicks";
-    private static final String USER_REGIONS_TOPIC = "user-regions";
-    private static final String USER_REGIONS_STORE_NAME = 
"user-regions-store-name";
-    private static final String OUTPUT_TOPIC = "output-topic";
-
-    @BeforeClass
-    public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(USER_CLICKS_TOPIC);
-        CLUSTER.createTopic(USER_REGIONS_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC);
+    private String userClicksTopic;
+    private String userRegionsTopic;
+    private String userRegionsStoreName;
+    private String outputTopic;
+    private static volatile int testNo = 0;
+    private KafkaStreams kafkaStreams;
+    private Properties streamsConfiguration;
+
+    @Before
+    public void before() {
+        testNo++;
+        userClicksTopic = "user-clicks-" + testNo;
+        userRegionsTopic = "user-regions-" + testNo;
+        userRegionsStoreName = "user-regions-store-name-" + testNo;
+        outputTopic = "output-topic-" + testNo;
+        CLUSTER.createTopic(userClicksTopic);
+        CLUSTER.createTopic(userRegionsTopic);
+        CLUSTER.createTopic(outputTopic);
+        streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"join-integration-test-" + testNo);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
+            TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
cacheSizeBytes);
+
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Parameter
+    public long cacheSizeBytes;
+
+    //Single parameter, use Object[]
+    @Parameters
+    public static Object[] data() {
+        return new Object[] {0, 10 * 1024 * 1024L};
+
     }
 
     /**
@@ -140,26 +186,13 @@ public class KStreamKTableJoinIntegrationTest {
         final Serde<String> stringSerde = Serdes.String();
         final Serde<Long> longSerde = Serdes.Long();
 
-        final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"join-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-
-        // Remove any state from previous test runs
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
         final KStreamBuilder builder = new KStreamBuilder();
 
         // This KStream contains information such as "alice" -> 13L.
         //
         // Because this is a KStream ("record stream"), multiple records for 
the same user will be
         // considered as separate click-count events, each of which will be 
added to the total count.
-        final KStream<String, Long> userClicksStream = 
builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
-
+        final KStream<String, Long> userClicksStream = 
builder.stream(stringSerde, longSerde, userClicksTopic);
         // This KTable contains information such as "alice" -> "europe".
         //
         // Because this is a KTable ("changelog stream"), only the latest 
value (here: region) for a
@@ -172,7 +205,8 @@ public class KStreamKTableJoinIntegrationTest {
         // subsequently processed in the `leftJoin`, the latest region update 
for "alice" is "europe"
         // (which overrides her previous region value of "asia").
         final KTable<String, String> userRegionsTable =
-            builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC, 
USER_REGIONS_STORE_NAME);
+            builder.table(stringSerde, stringSerde, userRegionsTopic, 
userRegionsStoreName);
+
 
         // Compute the number of clicks per region, e.g. "europe" -> 13L.
         //
@@ -213,10 +247,10 @@ public class KStreamKTableJoinIntegrationTest {
             }, "ClicksPerRegionUnwindowed");
 
         // Write the (continuously updating) results to the output topic.
-        clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);
+        clicksPerRegion.to(stringSerde, longSerde, outputTopic);
 
-        final KafkaStreams streams = new KafkaStreams(builder, 
streamsConfiguration);
-        streams.start();
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
 
         //
         // Step 2: Publish user-region information.
@@ -230,7 +264,8 @@ public class KStreamKTableJoinIntegrationTest {
         userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
         
userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         
userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-        IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, 
userRegions, userRegionsProducerConfig, mockTime);
+        IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, 
userRegions, userRegionsProducerConfig, mockTime);
+
 
         //
         // Step 3: Publish some user click events.
@@ -241,7 +276,7 @@ public class KStreamKTableJoinIntegrationTest {
         userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
         
userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         
userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);
-        IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, 
userClicks, userClicksProducerConfig, mockTime);
+        IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, 
userClicks, userClicksProducerConfig, mockTime);
 
         //
         // Step 4: Verify the application's output data.
@@ -252,9 +287,10 @@ public class KStreamKTableJoinIntegrationTest {
         consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
+
         final List<KeyValue<String, Long>> actualClicksPerRegion = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
-            OUTPUT_TOPIC, expectedClicksPerRegion.size());
-        streams.close();
+            outputTopic, expectedClicksPerRegion.size());
+
         assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index e9a7da1..2ac9c47 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -40,6 +40,11 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
+@RunWith(Parameterized.class)
 public class KStreamRepartitionJoinTest {
     private static final int NUM_BROKERS = 1;
 
@@ -73,11 +79,21 @@ public class KStreamRepartitionJoinTest {
     private String streamOneInput;
     private String streamTwoInput;
     private String streamFourInput;
+    private static volatile int testNo = 0;
+
+    @Parameter
+    public long cacheSizeBytes;
 
+    //Single parameter, use Object[]
+    @Parameters
+    public static Object[] data() {
+        return new Object[] {0, 10 * 1024 * 1024L};
+    }
 
     @Before
     public void before() {
-        final String applicationId = "kstream-repartition-join-test";
+        testNo++;
+        String applicationId = "kstream-repartition-join-test-" + testNo;
         builder = new KStreamBuilder();
         createTopics();
         streamsConfiguration = new Properties();
@@ -87,6 +103,8 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
cacheSizeBytes);
 
         streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), 
streamOneInput);
         streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), 
streamTwoInput);
@@ -112,8 +130,8 @@ public class KStreamRepartitionJoinTest {
 
     @Test
     public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
-        produceMessages();
 
+        produceMessages();
         final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
         final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();
         final ExpectedOutputOnTopic mapMapJoin = mapMapJoin();
@@ -136,7 +154,7 @@ public class KStreamRepartitionJoinTest {
     }
 
     private ExpectedOutputOnTopic mapStreamOneAndJoin() {
-        final String mapOneStreamAndJoinOutput = "map-one-join-output";
+        String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
         doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
mapOneStreamAndJoinOutput);
     }
@@ -145,8 +163,8 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
         final KStream<Integer, String> map2 = 
streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
 
-        doJoin(map1, map2, "map-both-streams-and-join");
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
"map-both-streams-and-join");
+        doJoin(map1, map2, "map-both-streams-and-join-" + testNo);
+        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
"map-both-streams-and-join-" + testNo);
     }
 
 
@@ -162,7 +180,7 @@ public class KStreamRepartitionJoinTest {
                 }
             }).map(keyMapper);
 
-        final String outputTopic = "map-map-join";
+        final String outputTopic = "map-map-join-" + testNo;
         doJoin(mapMapStream, streamTwo, outputTopic);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
outputTopic);
     }
@@ -173,7 +191,7 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, Integer> keySelected =
             streamOne.selectKey(MockKeyValueMapper.<Long, 
Integer>SelectValueMapper());
 
-        final String outputTopic = "select-key-join";
+        final String outputTopic = "select-key-join-" + testNo;
         doJoin(keySelected, streamTwo, outputTopic);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
outputTopic);
     }
@@ -188,7 +206,7 @@ public class KStreamRepartitionJoinTest {
                 }
             });
 
-        final String outputTopic = "flat-map-join";
+        final String outputTopic = "flat-map-join-" + testNo;
         doJoin(flatMapped, streamTwo, outputTopic);
 
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
outputTopic);
@@ -202,7 +220,8 @@ public class KStreamRepartitionJoinTest {
                 return value1 + ":" + value2;
             }
         };
-        final String output = "join-rhs-stream-mapped";
+
+        final String output = "join-rhs-stream-mapped-" + testNo;
         streamTwo
             .join(streamOne.map(keyMapper),
                 joiner,
@@ -220,7 +239,8 @@ public class KStreamRepartitionJoinTest {
 
         final KStream<Integer, String> map2 = 
streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
 
-        final String outputTopic = "left-join";
+
+        final String outputTopic = "left-join-" + testNo;
         map1.leftJoin(map2,
             valueJoiner,
             getJoinWindow(),
@@ -253,7 +273,8 @@ public class KStreamRepartitionJoinTest {
                 return value1 + ":" + value2;
             }
         };
-        final String topic = "map-join-join";
+
+        final String topic = "map-join-join-" + testNo;
         join.map(kvMapper)
             .join(streamFour.map(kvMapper),
                 joiner,
@@ -348,11 +369,11 @@ public class KStreamRepartitionJoinTest {
     }
 
     private void createTopics() {
-        streamOneInput = "stream-one";
-        streamTwoInput = "stream-two";
-        streamFourInput = "stream-four";
+        streamOneInput = "stream-one-" + testNo;
+        streamTwoInput = "stream-two-" + testNo;
+        streamFourInput = "stream-four-" + testNo;
         CLUSTER.createTopic(streamOneInput);
-        CLUSTER.createTopic(streamTwoInput, 2, 1);
+        CLUSTER.createTopic(streamTwoInput);
         CLUSTER.createTopic(streamFourInput);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 04d36f1..6c17d2d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -44,12 +44,19 @@ import 
org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -66,48 +73,67 @@ import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+
+
+@RunWith(Parameterized.class)
 public class QueryableStateIntegrationTest {
-    private static final int NUM_BROKERS = 2;
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER =
+        new EmbeddedKafkaCluster(NUM_BROKERS);
     private final MockTime mockTime = CLUSTER.time;
-    private static final String STREAM_ONE = "stream-one";
-    private static final String STREAM_TWO = "stream-two";
-    private static final String STREAM_CONCURRENT = "stream-concurrent";
-    private static final String OUTPUT_TOPIC = "output";
-    private static final String OUTPUT_TOPIC_CONCURRENT = "output-concurrent";
-    private static final String STREAM_THREE = "stream-three";
-    private static final int NUM_PARTITIONS = NUM_BROKERS;
-    private static final int NUM_REPLICAS = NUM_BROKERS;
+    private String streamOne = "stream-one";
+    private String streamTwo = "stream-two";
+    private String streamThree = "stream-three";
+    private String streamConcurrent = "stream-concurrent";
+    private String outputTopic = "output";
+    private String outputTopicConcurrent = "output-concurrent";
+    private String outputTopicThree = "output-three";
     // sufficiently large window size such that everything falls into 1 window
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, 
TimeUnit.DAYS);
-    private static final String OUTPUT_TOPIC_THREE = "output-three";
+    private static final int NUM_PARTITIONS = 2;
+    private static final int NUM_REPLICAS = NUM_BROKERS;
     private Properties streamsConfiguration;
     private List<String> inputValues;
     private Set<String> inputValuesKeys;
     private KafkaStreams kafkaStreams;
     private Comparator<KeyValue<String, String>> stringComparator;
     private Comparator<KeyValue<String, Long>> stringLongComparator;
+    private static int testNo = 0;
+
+    public void createTopics() {
+        streamOne = streamOne + "-" + testNo;
+        streamConcurrent = streamConcurrent + "-" + testNo;
+        streamThree = streamThree + "-" + testNo;
+        outputTopic = outputTopic + "-" + testNo;
+        outputTopicConcurrent = outputTopicConcurrent + "-" + testNo;
+        outputTopicThree = outputTopicThree + "-" + testNo;
+        streamTwo = streamTwo + "-" + testNo;
+        CLUSTER.createTopic(streamOne);
+        CLUSTER.createTopic(streamConcurrent);
+        CLUSTER.createTopic(streamTwo, NUM_PARTITIONS, NUM_REPLICAS);
+        CLUSTER.createTopic(streamThree, 4, 1);
+        CLUSTER.createTopic(outputTopic);
+        CLUSTER.createTopic(outputTopicConcurrent);
+        CLUSTER.createTopic(outputTopicThree);
+    }
 
-    @BeforeClass
-    public static void createTopics() {
-        CLUSTER.createTopic(STREAM_ONE);
-        CLUSTER.createTopic(STREAM_CONCURRENT);
-        CLUSTER.createTopic(STREAM_TWO, NUM_PARTITIONS, NUM_REPLICAS);
-        CLUSTER.createTopic(STREAM_THREE, 4, 1);
-        CLUSTER.createTopic(OUTPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC_CONCURRENT);
-        CLUSTER.createTopic(OUTPUT_TOPIC_THREE);
+    @Parameter
+    public long cacheSizeBytes;
+
+    //Single parameter, use Object[]
+    @Parameters
+    public static Object[] data() {
+        return new Object[]{0, 10 * 1024 * 1024L};
     }
 
     @Before
     public void before() throws IOException {
+        testNo++;
+        createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "queryable-state";
+        final String applicationId = "queryable-state-" + testNo;
 
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
         streamsConfiguration
@@ -116,7 +142,11 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory("qs-test").getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration
+            .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
cacheSizeBytes);
+
 
         stringComparator = new Comparator<KeyValue<String, String>>() {
 
@@ -244,13 +274,13 @@ public class QueryableStateIntegrationTest {
                     final ReadOnlyKeyValueStore<String, Long> store;
                     try {
                         store = streamsWithKey.store(storeName, 
QueryableStoreTypes.<String, Long>keyValueStore());
-                    } catch (final IllegalStateException e) {
-                        // Kafka Streams instance may have closed but 
rebalance hasn't happened
-                        return false;
                     } catch (final InvalidStateStoreException e) {
                         // rebalance
                         return false;
-                    }
+                    } catch (final IllegalStateException e) {
+                        // Kafka Streams instance may have closed but 
rebalance hasn't happened
+                        return false;
+                    } 
 
                     return store != null && store.get(key) != null;
                 }
@@ -297,25 +327,25 @@ public class QueryableStateIntegrationTest {
         final int numIterations = 500000;
 
         // create concurrent producer
-        final ProducerRunnable producerRunnable = new 
ProducerRunnable(STREAM_TWO, inputValues, numIterations);
+        final ProducerRunnable producerRunnable = new 
ProducerRunnable(streamThree, inputValues, numIterations);
         final Thread producerThread = new Thread(producerRunnable);
 
         // create three stream threads
         for (int i = 0; i < numThreads; i++) {
-            streamRunnables[i] = new StreamRunnable(STREAM_TWO, 
OUTPUT_TOPIC_THREE, i);
+            streamRunnables[i] = new StreamRunnable(streamThree, 
outputTopicThree, i);
             streamThreads[i] = new Thread(streamRunnables[i]);
             streamThreads[i].start();
         }
         producerThread.start();
 
         try {
-            waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC_THREE, 1);
+            waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
 
             for (int i = 0; i < numThreads; i++) {
                 verifyAllKVKeys(streamRunnables, 
streamRunnables[i].getStream(), inputValuesKeys,
-                    "word-count-store-" + STREAM_TWO);
+                    "word-count-store-" + streamThree);
                 verifyAllWindowedKeys(streamRunnables, 
streamRunnables[i].getStream(), inputValuesKeys,
-                    "windowed-word-count-store-" + STREAM_TWO, 0L, 
WINDOW_SIZE);
+                                      "windowed-word-count-store-" + 
streamThree, 0L, WINDOW_SIZE);
             }
 
             // kill N-1 threads
@@ -327,9 +357,9 @@ public class QueryableStateIntegrationTest {
 
             // query from the remaining thread
             verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), 
inputValuesKeys,
-                "word-count-store-" + STREAM_TWO);
+                "word-count-store-" + streamThree);
             verifyAllWindowedKeys(streamRunnables, 
streamRunnables[0].getStream(), inputValuesKeys,
-                "windowed-word-count-store-" + STREAM_TWO, 0L, WINDOW_SIZE);
+                                  "windowed-word-count-store-" + streamThree, 
0L, WINDOW_SIZE);
         } finally {
             for (int i = 0; i < numThreads; i++) {
                 if (!streamRunnables[i].isClosed()) {
@@ -349,20 +379,21 @@ public class QueryableStateIntegrationTest {
 
         final int numIterations = 500000;
 
-        final ProducerRunnable producerRunnable = new 
ProducerRunnable(STREAM_CONCURRENT, inputValues, numIterations);
+        final ProducerRunnable producerRunnable = new 
ProducerRunnable(streamConcurrent, inputValues, numIterations);
         final Thread producerThread = new Thread(producerRunnable);
-        kafkaStreams = createCountStream(STREAM_CONCURRENT, 
OUTPUT_TOPIC_CONCURRENT, streamsConfiguration);
+        kafkaStreams = createCountStream(streamConcurrent, 
outputTopicConcurrent, streamsConfiguration);
+
         kafkaStreams.start();
         producerThread.start();
 
         try {
-            waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC_CONCURRENT, 1);
+            waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
 
             final ReadOnlyKeyValueStore<String, Long>
-                keyValueStore = kafkaStreams.store("word-count-store-" + 
STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>keyValueStore());
+                keyValueStore = kafkaStreams.store("word-count-store-" + 
streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
 
             final ReadOnlyWindowStore<String, Long> windowStore =
-                kafkaStreams.store("windowed-word-count-store-" + 
STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>windowStore());
+                kafkaStreams.store("windowed-word-count-store-" + 
streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
 
 
             final Map<String, Long> expectedWindowState = new HashMap<>();
@@ -401,7 +432,7 @@ public class QueryableStateIntegrationTest {
         }
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
-            STREAM_ONE,
+                streamOne,
             batch1,
             TestUtils.producerConfig(
                 CLUSTER.bootstrapServers(),
@@ -410,16 +441,16 @@ public class QueryableStateIntegrationTest {
                 new Properties()),
             mockTime);
 
-        final KStream<String, String> s1 = builder.stream(STREAM_ONE);
+        final KStream<String, String> s1 = builder.stream(streamOne);
 
         // Non Windowed
-        s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), 
OUTPUT_TOPIC);
+        s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), 
outputTopic);
 
         s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
         kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
         kafkaStreams.start();
 
-        waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC, 1);
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
             myCount = kafkaStreams.store("my-count", 
QueryableStoreTypes.<String, Long>keyValueStore());
@@ -439,7 +470,7 @@ public class QueryableStateIntegrationTest {
     @Test
     public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws 
Exception {
         final KStreamBuilder builder = new KStreamBuilder();
-        final KStream<String, String> stream = builder.stream(STREAM_THREE);
+        final KStream<String, String> stream = builder.stream(streamThree);
 
         final String storeName = "count-by-key";
         stream.groupByKey().count(storeName);
@@ -448,7 +479,7 @@ public class QueryableStateIntegrationTest {
 
         final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
         IntegrationTestUtils.produceKeyValuesSynchronously(
-                STREAM_THREE,
+                streamThree,
                 Arrays.asList(hello, hello, hello, hello, hello, hello, hello, 
hello),
                 TestUtils.producerConfig(
                         CLUSTER.bootstrapServers(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 0846066..9b3a90b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -18,15 +18,24 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
 
 public class KGroupedTableImplTest {
 
@@ -73,4 +82,41 @@ public class KGroupedTableImplTest {
     public void shouldNotAllowNullStoreNameOnReduce() throws Exception {
         groupedTable.reduce(MockReducer.STRING_ADDER, 
MockReducer.STRING_REMOVER, null);
     }
+
+    @Test
+    public void shouldReduce() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String topic = "input";
+        final KTable<String, Integer> reduced = builder.table(Serdes.String(), 
Serdes.Integer(), topic, "store")
+                .groupBy(MockKeyValueMapper.<String, 
Integer>NoOpKeyValueMapper())
+                .reduce(MockReducer.INTEGER_ADDER, 
MockReducer.INTEGER_SUBTRACTOR, "reduced");
+
+        final Map<String, Integer> results = new HashMap<>();
+        reduced.foreach(new ForeachAction<String, Integer>() {
+            @Override
+            public void apply(final String key, final Integer value) {
+                results.put(key, value);
+            }
+        });
+
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, 
TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
+        driver.setTime(10L);
+        driver.process(topic, "A", 1);
+        driver.process(topic, "B", 2);
+        driver.flushState();
+
+        assertEquals(Integer.valueOf(1), results.get("A"));
+        assertEquals(Integer.valueOf(2), results.get("B"));
+
+        driver.process(topic, "A", 2);
+        driver.process(topic, "B", 1);
+        driver.process(topic, "A", 5);
+        driver.process(topic, "B", 6);
+        driver.flushState();
+
+        assertEquals(Integer.valueOf(5), results.get("A"));
+        assertEquals(Integer.valueOf(6), results.get("B"));
+
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 7175f63..596d246 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -285,14 +287,14 @@ public class KStreamKStreamJoinTest {
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
 
         driver = new KStreamTestDriver(builder, stateDir);
-        driver.setTime(time);
+
 
         // push two items to the primary stream. the other window is empty. 
this should produce no items.
         // w1 = {}
         // w2 = {}
         // --> w1 = { 0:X1, 1:X1 }
         //     w2 = {}
-
+        setRecordContext(time, topic1);
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
@@ -305,6 +307,7 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X1, 1:X1 }
         //     w2 = { 0:Y0, 1:Y1 }
 
+        setRecordContext(time, topic2);
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
@@ -313,9 +316,9 @@ public class KStreamKStreamJoinTest {
 
         // clear logically
         time = 1000L;
-
+        setRecordContext(time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
-            driver.setTime(time + i);
+            setRecordContext(time + i, topic1);
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
         processor.checkAndClearProcessResult();
@@ -324,7 +327,7 @@ public class KStreamKStreamJoinTest {
         // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
 
         time = 1000 + 100L;
-        driver.setTime(time);
+        setRecordContext(time, topic2);
 
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
@@ -332,28 +335,28 @@ public class KStreamKStreamJoinTest {
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", 
"3:X3+YY3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("3:X3+YY3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
@@ -363,35 +366,35 @@ public class KStreamKStreamJoinTest {
         // go back to the time before expiration
 
         time = 1000L - 100L - 1L;
-        driver.setTime(time);
+        setRecordContext(time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult();
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic2);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
@@ -400,9 +403,8 @@ public class KStreamKStreamJoinTest {
 
         // clear (logically)
         time = 2000L;
-
         for (int i = 0; i < expectedKeys.length; i++) {
-            driver.setTime(time + i);
+            setRecordContext(time + i, topic2);
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
 
@@ -412,35 +414,35 @@ public class KStreamKStreamJoinTest {
         // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
         time = 2000L + 100L;
-        driver.setTime(time);
+        setRecordContext(time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+Y2", "3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", 
"3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
@@ -450,39 +452,43 @@ public class KStreamKStreamJoinTest {
         // go back to the time before expiration
 
         time = 2000L - 100L - 1L;
-        driver.setTime(time);
+        setRecordContext(time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult();
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+Y2");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+Y2", "3:XX3+Y3");
     }
+
+    private void setRecordContext(final long time, final String topic) {
+        ((MockProcessorContext) driver.context()).setRecordContext(new 
ProcessorRecordContext(time, 0, 0, topic));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 95d9ef6..80512d9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -97,7 +99,7 @@ public class KStreamKStreamLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
         // push two items to the other stream. this should produce two items.
@@ -107,7 +109,7 @@ public class KStreamKStreamLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -117,7 +119,7 @@ public class KStreamKStreamLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", 
"2:X2+null", "3:X3+null");
 
         // push all items to the other stream. this should produce no items.
@@ -127,7 +129,7 @@ public class KStreamKStreamLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -137,7 +139,7 @@ public class KStreamKStreamLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", 
"1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
     }
 
@@ -167,116 +169,122 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
 
         driver = new KStreamTestDriver(builder, stateDir);
-        driver.setTime(time);
 
         // push two items to the primary stream. the other window is empty. 
this should produce two items
         // w = {}
         // --> w = {}
 
+        setRecordContext(time, topic1);
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
         // push two items to the other stream. this should produce no items.
         // w = {}
         // --> w = { 0:Y0, 1:Y1 }
 
+        setRecordContext(time, topic2);
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult();
 
         // clear logically
         time = 1000L;
+        setRecordContext(time, topic2);
 
         // push all items to the other stream. this should produce no items.
         // w = {}
         // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
         for (int i = 0; i < expectedKeys.length; i++) {
-            driver.setTime(time + i);
+            setRecordContext(time + i, topic2);
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult();
 
         // gradually expire items in window.
         // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
         time = 1000L + 100L;
-        driver.setTime(time);
+        setRecordContext(time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+Y2", "3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", 
"2:XX2+Y2", "3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+Y2", "3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+null", "3:XX3+Y3");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+null", "3:XX3+null");
 
         // go back to the time before expiration
 
         time = 1000L - 100L - 1L;
-        driver.setTime(time);
+        setRecordContext(time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+null", "3:XX3+null");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", 
"2:XX2+null", "3:XX3+null");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+null", "3:XX3+null");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+Y2", "3:XX3+null");
 
-        driver.setTime(++time);
+        setRecordContext(++time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+Y2", "3:XX3+Y3");
     }
+
+    private void setRecordContext(final long time, final String topic) {
+        ((MockProcessorContext) driver.context()).setRecordContext(new 
ProcessorRecordContext(time, 0, 0, topic));
+    }
 }

Reply via email to