Repository: kafka
Updated Branches:
  refs/heads/trunk fd6d7bcf3 -> 1974e1b0e


KAFKA-3502; move RocksDB options construction to init()

In RocksDBStore, options / wOptions / fOptions are constructed in the 
constructor, which needs to be dismissed in the close() call; however in some 
tests, the generated topology is not initialized at all, and hence the 
corresponding state stores are supposed to not be able to be closed as well 
since their `init` function is not called. This could cause the above option 
objects to be not released.

This is fixed in this patch to move the logic out of constructor and inside 
`init` functions, so that no RocksDB objects will be created in the constructor 
only. Also some minor cleanups:

1. In KStreamTestDriver.close(), we lost the logic to close the state stores 
but only call `flush`; it is now changed back to call both.
2. Moved the forwarding logic from KStreamTestDriver to MockProcessorContext to 
remove the mutual dependency: these functions should really be in 
ProcessorContext, not the test driver.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax 
<matth...@confluent.io>, Jason Gustafson <ja...@confluent.io>

Closes #2381 from guozhangwang/K3502-pure-virtual-function-unit-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1974e1b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1974e1b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1974e1b0

Branch: refs/heads/trunk
Commit: 1974e1b0e54abe5fdebd8ff3338df864b7ab60f3
Parents: fd6d7bc
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Tue Jan 17 20:29:55 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Jan 17 20:31:31 2017 -0800

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   |  21 +---
 .../streams/kstream/KStreamBuilderTest.java     |  58 +++++-----
 .../internals/KStreamKTableLeftJoinTest.java    |   2 -
 ...reamSessionWindowAggregateProcessorTest.java |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   2 +-
 .../internals/CachingKeyValueStoreTest.java     |   2 +-
 .../internals/CachingSessionStoreTest.java      |   2 +-
 .../state/internals/CachingWindowStoreTest.java |   2 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java    |   3 +-
 .../ChangeLoggingKeyValueStoreTest.java         |   3 +-
 .../ChangeLoggingSegmentedBytesStoreTest.java   |   3 +-
 .../MeteredSegmentedBytesStoreTest.java         |   3 +-
 .../RocksDBKeyValueStoreSupplierTest.java       |   9 +-
 .../RocksDBSegmentedBytesStoreTest.java         |   3 +-
 .../RocksDBSessionStoreSupplierTest.java        |   9 +-
 .../internals/RocksDBSessionStoreTest.java      |   3 +-
 .../RocksDBWindowStoreSupplierTest.java         |   9 +-
 .../state/internals/RocksDBWindowStoreTest.java |  24 ++--
 .../state/internals/SegmentIteratorTest.java    |  27 +++--
 .../streams/state/internals/SegmentsTest.java   |   3 +-
 .../apache/kafka/test/KStreamTestDriver.java    | 113 +++++--------------
 .../apache/kafka/test/MockProcessorContext.java |  57 +++++++---
 22 files changed, 153 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3f8d509..55c1bb8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -94,25 +94,21 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
 
     protected volatile boolean open = false;
 
-
-    public RocksDBStore(final String name,
-                        final Serde<K> keySerde,
-                        final Serde<V> valueSerde) {
+    RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
         this(name, DB_FILE_DIR, keySerde, valueSerde);
     }
 
-
-    public RocksDBStore(final String name,
-                        final String parentDir,
-                        final Serde<K> keySerde,
-                        final Serde<V> valueSerde) {
+    RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> 
valueSerde) {
         this.name = name;
         this.parentDir = parentDir;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
+    }
 
+    @SuppressWarnings("unchecked")
+    public void openDB(ProcessorContext context) {
         // initialize the default rocksdb options
-        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+        final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
         tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
         tableConfig.setBlockSize(BLOCK_SIZE);
 
@@ -125,16 +121,12 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         options.setCreateIfMissing(true);
         options.setErrorIfExists(false);
 
-
         wOptions = new WriteOptions();
         wOptions.setDisableWAL(true);
 
         fOptions = new FlushOptions();
         fOptions.setWaitForFlush(true);
-    }
 
-    @SuppressWarnings("unchecked")
-    public void openDB(ProcessorContext context) {
         final Map<String, Object> configs = context.appConfigs();
         final Class<RocksDBConfigSetter> configSetterClass = 
(Class<RocksDBConfigSetter>) 
configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
         if (configSetterClass != null) {
@@ -464,5 +456,4 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
             return super.hasNext() && comparator.compare(super.peekRawKey(), 
this.rawToKey) <= 0;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index c32082c..5f126c3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.HashSet;
@@ -41,8 +42,17 @@ import static org.junit.Assert.assertTrue;
 
 public class KStreamBuilderTest {
 
+    private static final String APP_ID = "app-id";
+
+    private final KStreamBuilder builder = new KStreamBuilder();
+
     private KStreamTestDriver driver = null;
 
+    @Before
+    public void setUp() {
+        builder.setApplicationId(APP_ID);
+    }
+
     @After
     public void cleanup() {
         if (driver != null) {
@@ -53,8 +63,6 @@ public class KStreamBuilderTest {
 
     @Test(expected = TopologyBuilderException.class)
     public void testFrom() {
-        final KStreamBuilder builder = new KStreamBuilder();
-
         builder.stream("topic-1", "topic-2");
 
         builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
@@ -62,17 +70,15 @@ public class KStreamBuilderTest {
 
     @Test
     public void testNewName() {
-        KStreamBuilder builder = new KStreamBuilder();
-
         assertEquals("X-0000000000", builder.newName("X-"));
         assertEquals("Y-0000000001", builder.newName("Y-"));
         assertEquals("Z-0000000002", builder.newName("Z-"));
 
-        builder = new KStreamBuilder();
+        KStreamBuilder newBuilder = new KStreamBuilder();
 
-        assertEquals("X-0000000000", builder.newName("X-"));
-        assertEquals("Y-0000000001", builder.newName("Y-"));
-        assertEquals("Z-0000000002", builder.newName("Z-"));
+        assertEquals("X-0000000000", newBuilder.newName("X-"));
+        assertEquals("Y-0000000001", newBuilder.newName("Y-"));
+        assertEquals("Z-0000000002", newBuilder.newName("Z-"));
     }
 
     @Test
@@ -80,8 +86,6 @@ public class KStreamBuilderTest {
         String topic1 = "topic-1";
         String topic2 = "topic-2";
 
-        KStreamBuilder builder = new KStreamBuilder();
-
         KStream<String, String> source1 = builder.stream(topic1);
         KStream<String, String> source2 = builder.stream(topic2);
         KStream<String, String> merged = builder.merge(source1, source2);
@@ -105,7 +109,6 @@ public class KStreamBuilderTest {
         final String topic1 = "topic-1";
         final String topic2 = "topic-2";
         final String topic3 = "topic-3";
-        final KStreamBuilder builder = new KStreamBuilder();
         final KStream<String, String> source1 = builder.stream(topic1);
         final KStream<String, String> source2 = builder.stream(topic2);
         final KStream<String, String> source3 = builder.stream(topic3);
@@ -131,28 +134,26 @@ public class KStreamBuilderTest {
         final KStream<String, String> merged = builder.merge(processedSource1, 
processedSource2, source3);
         merged.groupByKey().count("my-table");
         final Map<String, Set<String>> actual = 
builder.stateStoreNameToSourceTopics();
+
         assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), 
actual.get("my-table"));
     }
 
     @Test(expected = TopologyBuilderException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
-        new KStreamBuilder().stream();
+        builder.stream();
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
-        new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, 
null);
+        builder.stream(Serdes.String(), Serdes.String(), null, null);
     }
 
     @Test
     public void shouldNotMaterializeSourceKTableIfStateNameNotSpecified() 
throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId("app-id");
-
         builder.table("topic1", "table1");
         builder.table("topic2", null);
 
-        ProcessorTopology topology = builder.build(null);
+        final ProcessorTopology topology = builder.build(null);
 
         assertEquals(1, topology.stateStores().size());
         assertEquals("table1", topology.stateStores().get(0).name());
@@ -162,23 +163,26 @@ public class KStreamBuilderTest {
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
         builder.globalTable("table", "globalTable");
+
         final ProcessorTopology topology = builder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
-        final StateStore store = stateStores.iterator().next();
+
         assertEquals(1, stateStores.size());
-        assertEquals("globalTable", store.name());
+        assertEquals("globalTable", stateStores.get(0).name());
     }
 
     @Test
     public void shouldBuildGlobalTopologyWithAllGlobalTables() throws 
Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
         builder.globalTable("table", "globalTable");
         builder.globalTable("table2", "globalTable2");
+
         final ProcessorTopology topology = builder.buildGlobalStateTopology();
+
         final List<StateStore> stateStores = topology.globalStateStores();
-        assertEquals(Utils.mkSet("table", "table2"), topology.sourceTopics());
+        final Set<String> sourceTopics = topology.sourceTopics();
+
+        assertEquals(Utils.mkSet("table", "table2"), sourceTopics);
         assertEquals(2, stateStores.size());
     }
 
@@ -186,7 +190,6 @@ public class KStreamBuilderTest {
     public void shouldAddGlobalTablesToEachGroup() throws Exception {
         final String one = "globalTable";
         final String two = "globalTable2";
-        final KStreamBuilder builder = new KStreamBuilder();
         final GlobalKTable<String, String> globalTable = 
builder.globalTable("table", one);
         final GlobalKTable<String, String> globalTable2 = 
builder.globalTable("table2", two);
 
@@ -203,7 +206,7 @@ public class KStreamBuilderTest {
         stream.leftJoin(globalTable, kvMapper, 
MockValueJoiner.TOSTRING_JOINER);
         final KStream<String, String> stream2 = builder.stream("t2");
         stream2.leftJoin(globalTable2, kvMapper, 
MockValueJoiner.TOSTRING_JOINER);
-        builder.setApplicationId("app-id");
+
         final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
         for (Integer groupId : nodeGroups.keySet()) {
             final ProcessorTopology topology = builder.build(groupId);
@@ -212,6 +215,7 @@ public class KStreamBuilderTest {
             for (StateStore stateStore : stateStores) {
                 names.add(stateStore.name());
             }
+
             assertEquals(2, stateStores.size());
             assertTrue(names.contains(one));
             assertTrue(names.contains(two));
@@ -220,9 +224,6 @@ public class KStreamBuilderTest {
 
     @Test
     public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId("app-id");
-
         final KStream<String, String> playEvents = builder.stream("events");
 
         final KTable<String, String> table = builder.table("table-topic", 
"table-store");
@@ -230,7 +231,8 @@ public class KStreamBuilderTest {
 
         final KStream<String, String> mapped = 
playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
         mapped.leftJoin(table, 
MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
+
         assertEquals(Collections.singleton("table-topic"), 
builder.stateStoreNameToSourceTopics().get("table-store"));
-        
assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"),
 builder.stateStoreNameToSourceTopics().get("count"));
+        assertEquals(Collections.singleton(APP_ID + 
"-KSTREAM-MAP-0000000003-repartition"), 
builder.stateStoreNameToSourceTopics().get("count"));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 569ea5a..b6988e3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -141,6 +141,4 @@ public class KStreamKTableLeftJoinTest {
 
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+YY2", "3:XX3+YY3");
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index c3368a1..2e5b201 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -32,7 +31,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -91,7 +89,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(new KStreamTestDriver(new 
KStreamBuilder(), stateDir), stateDir,
+        context = new MockProcessorContext(stateDir,
             Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new 
ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()))) {
             @Override
             public <K, V> void forward(final K key, final V value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index c3df49d..efa0e0e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -231,7 +231,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
 
 
-        this.context = new MockProcessorContext(null, this.stateDir, 
serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
+        this.context = new MockProcessorContext(this.stateDir, 
serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
             @Override
             public TaskId taskId() {
                 return new TaskId(0, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 8746a86..a00526f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -63,7 +63,7 @@ public class CachingKeyValueStoreTest {
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), 
Serdes.String());
         store.setFlushListener(cacheFlushListener);
         cache = new ThreadCache("testCache", maxCacheSizeBytes, new 
MockStreamsMetrics(new Metrics()));
-        final MockProcessorContext context = new MockProcessorContext(null, 
null, null, null, (RecordCollector) null, cache);
+        final MockProcessorContext context = new MockProcessorContext(null, 
null, null, (RecordCollector) null, cache);
         topic = "topic";
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
         store.init(context, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 5035f70..65a249e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -64,7 +64,7 @@ public class CachingSessionStoreTest {
                                                  Serdes.String(),
                                                  Serdes.Long());
         cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new 
MockStreamsMetrics(new Metrics()));
-        final MockProcessorContext context = new MockProcessorContext(null, 
TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) 
null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, "topic"));
         cachingStore.init(context, cachingStore);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 1de1002..2728aa0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -69,7 +69,7 @@ public class CachingWindowStoreTest {
         cachingStore.setFlushListener(cacheListener);
         cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new 
MockStreamsMetrics(new Metrics()));
         topic = "topic";
-        final MockProcessorContext context = new MockProcessorContext(null, 
TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) 
null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, topic));
         cachingStore.init(context, cachingStore);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 82fb831..99b1347 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -57,8 +57,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
                 sent.put(record.key(), record.value());
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       
collector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
index 8815c5a..442602c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
@@ -65,8 +65,7 @@ public class ChangeLoggingKeyValueStoreTest {
                 sent.put(record.key(), record.value());
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       
collector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
index 621feb3..51f31bf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
@@ -52,8 +52,7 @@ public class ChangeLoggingSegmentedBytesStoreTest {
                 sent.put(record.key(), record.value());
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       
collector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
index 6306512..9160a73 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
@@ -93,8 +93,7 @@ public class MeteredSegmentedBytesStoreTest {
 
         };
 
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       new 
NoOpRecordCollector(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 3d9a56c..c510089 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -44,8 +44,7 @@ public class RocksDBKeyValueStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
     private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(null,
-                                                                          
TestUtils.tempDirectory(),
+    private final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                           
Serdes.String(),
                                                                           
Serdes.String(),
                                                                           new 
NoOpRecordCollector(),
@@ -67,8 +66,7 @@ public class RocksDBKeyValueStoreSupplierTest {
                 logged.add(record);
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.String(),
                                                                       
collector,
@@ -89,8 +87,7 @@ public class RocksDBKeyValueStoreSupplierTest {
                 logged.add(record);
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.String(),
                                                                       
collector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 7fe490c..3763290 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -62,8 +62,7 @@ public class RocksDBSegmentedBytesStoreTest {
                                                     new SessionKeySchema());
 
         stateDir = TestUtils.tempDirectory();
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      stateDir,
+        final MockProcessorContext context = new MockProcessorContext(stateDir,
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       new 
NoOpRecordCollector(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 28196a2..6677624 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -46,8 +46,7 @@ public class RocksDBSessionStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
     private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(null,
-                                                                          
TestUtils.tempDirectory(),
+    private final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                           
Serdes.String(),
                                                                           
Serdes.String(),
                                                                           new 
NoOpRecordCollector(),
@@ -70,8 +69,7 @@ public class RocksDBSessionStoreSupplierTest {
                 logged.add(record);
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.String(),
                                                                       
collector,
@@ -92,8 +90,7 @@ public class RocksDBSessionStoreSupplierTest {
                 logged.add(record);
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.String(),
                                                                       
collector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 5a23a1c..e1801b8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -53,8 +53,7 @@ public class RocksDBSessionStoreTest {
                                                  Serdes.String(),
                                                  Serdes.Long());
 
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       new 
NoOpRecordCollector(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index 897ec62..d9a0d4f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -45,8 +45,7 @@ public class RocksDBWindowStoreSupplierTest {
     private static final String STORE_NAME = "name";
     private WindowStore<String, String> store;
     private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(null,
-                                                                          
TestUtils.tempDirectory(),
+    private final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                           
Serdes.String(),
                                                                           
Serdes.String(),
                                                                           new 
NoOpRecordCollector(),
@@ -67,8 +66,7 @@ public class RocksDBWindowStoreSupplierTest {
                 logged.add(record);
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.String(),
                                                                       
collector,
@@ -89,8 +87,7 @@ public class RocksDBWindowStoreSupplierTest {
                 logged.add(record);
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
+        final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                       
Serdes.String(),
                                                                       
Serdes.String(),
                                                                       
collector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 79223de..ee846f7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -100,7 +100,7 @@ public class RocksDBWindowStoreTest {
         };
 
         MockProcessorContext context = new MockProcessorContext(
-                null, baseDir,
+                baseDir,
                 byteArraySerde, byteArraySerde,
                 recordCollector, cache);
 
@@ -151,7 +151,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -225,7 +225,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -314,7 +314,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -401,7 +401,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -457,7 +457,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                null, baseDir,
+                baseDir,
                 byteArraySerde, byteArraySerde,
                 recordCollector, cache);
 
@@ -485,7 +485,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -613,7 +613,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -662,7 +662,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -710,7 +710,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -810,7 +810,7 @@ public class RocksDBWindowStoreTest {
             };
 
             MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
+                    baseDir,
                     byteArraySerde, byteArraySerde,
                     recordCollector, cache);
 
@@ -870,7 +870,7 @@ public class RocksDBWindowStoreTest {
         };
 
         MockProcessorContext context = new MockProcessorContext(
-                null, baseDir,
+                baseDir,
                 byteArraySerde, byteArraySerde,
                 recordCollector, cache);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 3d2da31..ae6fb5a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -48,14 +48,16 @@ public class SegmentIteratorTest {
         }
     };
 
+    private SegmentIterator iterator = null;
+
     @Before
     public void before() {
-        final MockProcessorContext context = new MockProcessorContext(null,
-                                                                      
TestUtils.tempDirectory(),
-                                                                      
Serdes.String(),
-                                                                      
Serdes.String(),
-                                                                      new 
NoOpRecordCollector(),
-                                                                      new 
ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+        final MockProcessorContext context = new MockProcessorContext(
+                TestUtils.tempDirectory(),
+                Serdes.String(),
+                Serdes.String(),
+                new NoOpRecordCollector(),
+                new ThreadCache("testCache", 0, new MockStreamsMetrics(new 
Metrics())));
         segmentOne.openDB(context);
         segmentTwo.openDB(context);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
@@ -67,13 +69,17 @@ public class SegmentIteratorTest {
 
     @After
     public void closeSegments() {
+        if (iterator != null) {
+            iterator.close();
+            iterator = null;
+        }
         segmentOne.close();
         segmentTwo.close();
     }
 
     @Test
     public void shouldIterateOverAllSegments() throws Exception {
-        final SegmentIterator iterator = new SegmentIterator(
+        iterator = new SegmentIterator(
                 Arrays.asList(segmentOne,
                               segmentTwo).iterator(),
                 hasNextCondition,
@@ -101,7 +107,7 @@ public class SegmentIteratorTest {
 
     @Test
     public void shouldOnlyIterateOverSegmentsInRange() throws Exception {
-        final SegmentIterator iterator = new SegmentIterator(
+        iterator = new SegmentIterator(
                 Arrays.asList(segmentOne,
                               segmentTwo).iterator(),
                 hasNextCondition,
@@ -121,7 +127,7 @@ public class SegmentIteratorTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() throws 
Exception {
-        final SegmentIterator iterator = new SegmentIterator(
+        iterator = new SegmentIterator(
                 Arrays.asList(segmentOne,
                               segmentTwo).iterator(),
                 hasNextCondition,
@@ -133,7 +139,7 @@ public class SegmentIteratorTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementOnNextIfNoNext() throws Exception {
-        final SegmentIterator iterator = new SegmentIterator(
+        iterator = new SegmentIterator(
                 Arrays.asList(segmentOne,
                               segmentTwo).iterator(),
                 hasNextCondition,
@@ -146,5 +152,4 @@ public class SegmentIteratorTest {
     private KeyValue<String, String> toStringKeyValue(final KeyValue<Bytes, 
byte[]> binaryKv) {
         return KeyValue.pair(new String(binaryKv.key.get()), new 
String(binaryKv.value));
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 47207ec..9e34e63 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -43,8 +43,7 @@ public class SegmentsTest {
 
     @Before
     public void createContext() {
-        context = new MockProcessorContext(null,
-                                           TestUtils.tempDirectory(),
+        context = new MockProcessorContext(TestUtils.tempDirectory(),
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index d51384c..207705c 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -42,14 +42,11 @@ import java.util.Set;
 
 public class KStreamTestDriver {
 
+    private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
+
     private final ProcessorTopology topology;
     private final MockProcessorContext context;
     private final ProcessorTopology globalTopology;
-    private ThreadCache cache;
-    private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
-    public final File stateDir;
-
-    private ProcessorNode currNode;
 
     public KStreamTestDriver(KStreamBuilder builder) {
         this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
@@ -78,9 +75,8 @@ public class KStreamTestDriver {
         builder.setApplicationId("TestDriver");
         this.topology = builder.build(null);
         this.globalTopology = builder.buildGlobalStateTopology();
-        this.stateDir = stateDir;
-        this.cache = new ThreadCache("testCache", cacheSize, new 
MockStreamsMetrics(new Metrics()));
-        this.context = new MockProcessorContext(this, stateDir, keySerde, 
valSerde, new MockRecordCollector(), cache);
+        ThreadCache cache = new ThreadCache("testCache", cacheSize, new 
MockStreamsMetrics(new Metrics()));
+        this.context = new MockProcessorContext(stateDir, keySerde, valSerde, 
new MockRecordCollector(), cache);
         this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, 
"topic"));
         // init global topology first as it will add stores to the
         // store map that are required for joins etc.
@@ -88,7 +84,6 @@ public class KStreamTestDriver {
             initTopology(globalTopology, globalTopology.globalStateStores());
         }
         initTopology(topology, topology.stateStores());
-
     }
 
     private void initTopology(final ProcessorTopology topology, final 
List<StateStore> stores) {
@@ -106,14 +101,17 @@ public class KStreamTestDriver {
         }
     }
 
+    public ProcessorTopology topology() {
+        return topology;
+    }
 
     public ProcessorContext context() {
         return context;
     }
 
     public void process(String topicName, Object key, Object value) {
-        final ProcessorNode previous = currNode;
-        currNode = topology.source(topicName);
+        final ProcessorNode prevNode = context.currentNode();
+        ProcessorNode currNode = topology.source(topicName);
         if (currNode == null && globalTopology != null) {
             currNode = globalTopology.source(topicName);
         }
@@ -121,33 +119,27 @@ public class KStreamTestDriver {
         // if currNode is null, check if this topic is a changelog topic;
         // if yes, skip
         if 
(topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
-            currNode = previous;
             return;
         }
         context.setRecordContext(createRecordContext(context.timestamp()));
         context.setCurrentNode(currNode);
         try {
-            forward(key, value);
+            context.forward(key, value);
         } finally {
-            currNode = null;
-            context.setCurrentNode(null);
+            context.setCurrentNode(prevNode);
         }
     }
 
-    private ProcessorRecordContext createRecordContext(long timestamp) {
-        return new ProcessorRecordContext(timestamp, -1, -1, "topic");
-    }
-
-
     public void punctuate(long timestamp) {
+        final ProcessorNode prevNode = context.currentNode();
         for (ProcessorNode processor : topology.processors()) {
             if (processor.processor() != null) {
-                currNode = processor;
+                context.setRecordContext(createRecordContext(timestamp));
+                context.setCurrentNode(processor);
                 try {
-                    context.setRecordContext(createRecordContext(timestamp));
                     processor.processor().punctuate(timestamp);
                 } finally {
-                    currNode = null;
+                    context.setCurrentNode(prevNode);
                 }
             }
         }
@@ -157,59 +149,18 @@ public class KStreamTestDriver {
         context.setTime(timestamp);
     }
 
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value) {
-        ProcessorNode thisNode = currNode;
-        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) 
currNode.children()) {
-            currNode = childNode;
-            try {
-                childNode.process(key, value);
-            } finally {
-                currNode = thisNode;
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value, int childIndex) {
-        ProcessorNode thisNode = currNode;
-        ProcessorNode childNode = (ProcessorNode<K, V>) 
thisNode.children().get(childIndex);
-        currNode = childNode;
-        try {
-            childNode.process(key, value);
-        } finally {
-            currNode = thisNode;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value, String childName) {
-        ProcessorNode thisNode = currNode;
-        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) 
thisNode.children()) {
-            if (childNode.name().equals(childName)) {
-                currNode = childNode;
-                try {
-                    childNode.process(key, value);
-                } finally {
-                    currNode = thisNode;
-                }
-                break;
-            }
-        }
-    }
-
     public void close() {
         // close all processors
         for (ProcessorNode node : topology.processors()) {
-            currNode = node;
+            context.setCurrentNode(node);
             try {
                 node.close();
             } finally {
-                currNode = null;
+                context.setCurrentNode(null);
             }
         }
 
-        flushState();
+        closeState();
     }
 
     public Set<String> allProcessorNames() {
@@ -245,24 +196,19 @@ public class KStreamTestDriver {
         }
     }
 
-    public void setCurrentNode(final ProcessorNode currentNode) {
-        currNode = currentNode;
-    }
-
-    public StateStore globalStateStore(final String storeName) {
-        if (globalTopology != null) {
-            for (final StateStore store : globalTopology.globalStateStores()) {
-                if (store.name().equals(storeName)) {
-                    return store;
-                }
-            }
+    private void closeState() {
+        for (StateStore stateStore : context.allStateStores().values()) {
+            stateStore.flush();
+            stateStore.close();
         }
-        return null;
     }
 
+    private ProcessorRecordContext createRecordContext(long timestamp) {
+        return new ProcessorRecordContext(timestamp, -1, -1, "topic");
+    }
 
     private class MockRecordCollector extends RecordCollectorImpl {
-        public MockRecordCollector() {
+        MockRecordCollector() {
             super(null, "KStreamTestDriver");
         }
 
@@ -280,12 +226,9 @@ public class KStreamTestDriver {
         }
 
         @Override
-        public void flush() {
-        }
+        public void flush() {}
 
         @Override
-        public void close() {
-        }
+        public void close() {}
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 5ae7112..93f0f42 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.test;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -48,7 +47,6 @@ import java.util.LinkedHashMap;
 
 public class MockProcessorContext implements InternalProcessorContext, 
RecordCollector.Supplier {
 
-    private final KStreamTestDriver driver;
     private final Serde<?> keySerde;
     private final Serde<?> valSerde;
     private final RecordCollector.Supplier recordCollectorSupplier;
@@ -67,34 +65,34 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
     private ProcessorNode currentNode;
 
     public MockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector 
collector) {
-        this(null, null, serdes.keySerde(), serdes.valueSerde(), collector, 
null);
+        this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
     }
 
-    public MockProcessorContext(KStreamTestDriver driver, File stateDir,
+    public MockProcessorContext(File stateDir,
                                 Serde<?> keySerde,
                                 Serde<?> valSerde,
                                 final RecordCollector collector,
                                 final ThreadCache cache) {
-        this(driver, stateDir, keySerde, valSerde,
+        this(stateDir, keySerde, valSerde,
                 new RecordCollector.Supplier() {
                     @Override
                     public RecordCollector recordCollector() {
                         return collector;
                     }
-                }, cache);
+                },
+                cache);
     }
 
-    public MockProcessorContext(KStreamTestDriver driver, File stateDir,
-                                Serde<?> keySerde,
-                                Serde<?> valSerde,
-                                RecordCollector.Supplier collectorSupplier,
+    public MockProcessorContext(final File stateDir,
+                                final Serde<?> keySerde,
+                                final Serde<?> valSerde,
+                                final RecordCollector.Supplier 
collectorSupplier,
                                 final ThreadCache cache) {
-        this.driver = driver;
         this.stateDir = stateDir;
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.recordCollectorSupplier = collectorSupplier;
-        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new 
JmxReporter()), time, true);
+        this.metrics = new Metrics(config, 
Collections.singletonList((MetricsReporter) new JmxReporter()), time, true);
         this.cache = cache;
         this.streamsMetrics = new MockStreamsMetrics(metrics);
     }
@@ -182,19 +180,45 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(K key, V value) {
-        driver.forward(key, value);
+        ProcessorNode thisNode = currentNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) 
thisNode.children()) {
+            currentNode = childNode;
+            try {
+                childNode.process(key, value);
+            } finally {
+                currentNode = thisNode;
+            }
+        }
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(K key, V value, int childIndex) {
-        driver.forward(key, value, childIndex);
+        ProcessorNode thisNode = currentNode;
+        ProcessorNode childNode = (ProcessorNode<K, V>) 
thisNode.children().get(childIndex);
+        currentNode = childNode;
+        try {
+            childNode.process(key, value);
+        } finally {
+            currentNode = thisNode;
+        }
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(K key, V value, String childName) {
-        driver.forward(key, value, childName);
+        ProcessorNode thisNode = currentNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) 
thisNode.children()) {
+            if (childNode.name().equals(childName)) {
+                currentNode = childNode;
+                try {
+                    childNode.process(key, value);
+                } finally {
+                    currentNode = thisNode;
+                }
+                break;
+            }
+        }
     }
 
 
@@ -268,8 +292,7 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
 
     @Override
     public void setCurrentNode(final ProcessorNode currentNode) {
-        this.currentNode  = currentNode;
-        driver.setCurrentNode(currentNode);
+        this.currentNode = currentNode;
     }
 
     @Override

Reply via email to