This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9aeebe  KAFKA-13123: close KeyValueIterator instances in example code 
and tests (#11105)
f9aeebe is described below

commit f9aeebed054000934af7555f39085f5caecf34bc
Author: Luke Chen <[email protected]>
AuthorDate: Tue Jul 27 07:23:04 2021 +0800

    KAFKA-13123: close KeyValueIterator instances in example code and tests 
(#11105)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../examples/docs/DeveloperGuideTesting.java       |   9 +-
 .../streams/integration/EosIntegrationTest.java    |  11 +-
 .../KStreamAggregationIntegrationTest.java         |   9 +-
 .../integration/QueryableStateIntegrationTest.java |  26 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |  46 +--
 .../processor/internals/ProcessorTopologyTest.java | 351 +++++++--------------
 .../streams/state/internals/RocksDBStoreTest.java  | 209 ++++++------
 .../state/internals/RocksDBWindowStoreTest.java    |  17 +-
 .../state/internals/SessionKeySchemaTest.java      |   8 +
 .../state/internals/WindowKeySchemaTest.java       |  14 +-
 10 files changed, 291 insertions(+), 409 deletions(-)

diff --git 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 7b54486..41b61e3 100644
--- 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -173,10 +173,11 @@ public class DeveloperGuideTesting {
         }
 
         private void flushStore(final long timestamp) {
-            final KeyValueIterator<String, Long> it = store.all();
-            while (it.hasNext()) {
-                final KeyValue<String, Long> next = it.next();
-                context.forward(new Record<>(next.key, next.value, timestamp));
+            try (final KeyValueIterator<String, Long> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<String, Long> next = it.next();
+                    context.forward(new Record<>(next.key, next.value, 
timestamp));
+                }
             }
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 343a105..0d5c187 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -993,11 +993,12 @@ public class EosIntegrationTest {
             .getStore(300_000L, storeName, streams, 
QueryableStoreTypes.keyValueStore());
         assertNotNull(store);
 
-        final KeyValueIterator<Long, Long> it = store.all();
-        while (it.hasNext()) {
-            assertTrue(reason, expectedStoreContent.remove(it.next()));
-        }
+        try (final KeyValueIterator<Long, Long> it = store.all()) {
+            while (it.hasNext()) {
+                assertTrue(reason, expectedStoreContent.remove(it.next()));
+            }
 
-        assertTrue(reason, expectedStoreContent.isEmpty());
+            assertTrue(reason, expectedStoreContent.isEmpty());
+        }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 1b92ab5..e581903 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -923,10 +923,11 @@ public class KStreamAggregationIntegrationTest {
         final ReadOnlySessionStore<String, String> sessionStore =
             IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams, 
QueryableStoreTypes.sessionStore());
 
-        final KeyValueIterator<Windowed<String>, String> bob = 
sessionStore.fetch("bob");
-        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new 
SessionWindow(t1, t1)), "start")));
-        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new 
SessionWindow(t3, t4)), "pause:resume")));
-        assertFalse(bob.hasNext());
+        try (final KeyValueIterator<Windowed<String>, String> bob = 
sessionStore.fetch("bob")) {
+            assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", 
new SessionWindow(t1, t1)), "start")));
+            assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", 
new SessionWindow(t3, t4)), "pause:resume")));
+            assertFalse(bob.hasNext());
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index af954e64..15b9ea6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -846,9 +846,10 @@ public class QueryableStateIntegrationTest {
             assertEquals(Long.valueOf(batchEntry.value), 
myMapStore.get(batchEntry.key));
         }
 
-        final KeyValueIterator<String, Long> range = myMapStore.range("hello", 
"kafka");
-        while (range.hasNext()) {
-            System.out.println(range.next());
+        try (final KeyValueIterator<String, Long> range = 
myMapStore.range("hello", "kafka")) {
+            while (range.hasNext()) {
+                System.out.println(range.next());
+            }
         }
     }
 
@@ -899,9 +900,10 @@ public class QueryableStateIntegrationTest {
             IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, 
keyValueStore());
 
         int index = 0;
-        final KeyValueIterator<String, Long> range = 
myMapStore.prefixScan("go", Serdes.String().serializer());
-        while (range.hasNext()) {
-            assertEquals(expectedPrefixScanResult.get(index++), range.next());
+        try (final KeyValueIterator<String, Long> range = 
myMapStore.prefixScan("go", Serdes.String().serializer())) {
+            while (range.hasNext()) {
+                assertEquals(expectedPrefixScanResult.get(index++), 
range.next());
+            }
         }
     }
 
@@ -1234,12 +1236,14 @@ public class QueryableStateIntegrationTest {
 
     private Set<KeyValue<String, Long>> fetch(final 
ReadOnlyWindowStore<String, Long> store,
                                               final String key) {
-        final WindowStoreIterator<Long> fetch =
-            store.fetch(key, ofEpochMilli(0), 
ofEpochMilli(System.currentTimeMillis()));
-        if (fetch.hasNext()) {
-            final KeyValue<Long, Long> next = fetch.next();
-            return Collections.singleton(KeyValue.pair(key, next.value));
+        try (final WindowStoreIterator<Long> fetch =
+                 store.fetch(key, ofEpochMilli(0), 
ofEpochMilli(System.currentTimeMillis()))) {
+            if (fetch.hasNext()) {
+                final KeyValue<Long, Long> next = fetch.next();
+                return Collections.singleton(KeyValue.pair(key, next.value));
+            }
         }
+
         return Collections.emptySet();
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index f4ebfdd..a897d11 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -157,10 +157,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(500);
         processor.process("john", "second");
 
-        final KeyValueIterator<Windowed<String>, Long> values =
-            sessionStore.findSessions("john", 0, 2000);
-        assertTrue(values.hasNext());
-        assertEquals(Long.valueOf(2), values.next().value);
+        try (final KeyValueIterator<Windowed<String>, Long> values =
+                 sessionStore.findSessions("john", 0, 2000)) {
+            assertTrue(values.hasNext());
+            assertEquals(Long.valueOf(2), values.next().value);
+        }
     }
 
     @Test
@@ -180,12 +181,13 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(GAP_MS / 2);
         processor.process(sessionId, "third");
 
-        final KeyValueIterator<Windowed<String>, Long> iterator =
-            sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
-        final KeyValue<Windowed<String>, Long> kv = iterator.next();
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                 sessionStore.findSessions(sessionId, 0, GAP_MS + 1)) {
+            final KeyValue<Windowed<String>, Long> kv = iterator.next();
 
-        assertEquals(Long.valueOf(3), kv.value);
-        assertFalse(iterator.hasNext());
+            assertEquals(Long.valueOf(3), kv.value);
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
@@ -193,10 +195,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(0);
         processor.process("mel", "first");
         processor.process("mel", "second");
-        final KeyValueIterator<Windowed<String>, Long> iterator =
-            sessionStore.findSessions("mel", 0, 0);
-        assertEquals(Long.valueOf(2L), iterator.next().value);
-        assertFalse(iterator.hasNext());
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                 sessionStore.findSessions("mel", 0, 0)) {
+            assertEquals(Long.valueOf(2L), iterator.next().value);
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
@@ -240,18 +243,21 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process("a", "1");
 
         // first ensure it is in the store
-        final KeyValueIterator<Windowed<String>, Long> a1 =
-            sessionStore.findSessions("a", 0, 0);
-        assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 
0)), 1L), a1.next());
+        try (final KeyValueIterator<Windowed<String>, Long> a1 =
+                 sessionStore.findSessions("a", 0, 0)) {
+            assertEquals(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), 1L), a1.next());
+        }
+
 
         context.setTime(100);
         processor.process("a", "2");
         // a1 from above should have been removed
         // should have merged session in store
-        final KeyValueIterator<Windowed<String>, Long> a2 =
-            sessionStore.findSessions("a", 0, 100);
-        assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 
100)), 2L), a2.next());
-        assertFalse(a2.hasNext());
+        try (final KeyValueIterator<Windowed<String>, Long> a2 =
+                 sessionStore.findSessions("a", 0, 100)) {
+            assertEquals(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 100)), 2L), a2.next());
+            assertFalse(a2.hasNext());
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a0dc50e..de26c7b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -84,6 +84,9 @@ public class ProcessorTopologyTest {
     private static final Serializer<String> STRING_SERIALIZER = new 
StringSerializer();
     private static final Deserializer<String> STRING_DESERIALIZER = new 
StringDeserializer();
 
+    private static final String DEFAULT_STORE_NAME = "prefixScanStore";
+    private static final String DEFAULT_PREFIX = "key";
+
     private static final String INPUT_TOPIC_1 = "input-topic-1";
     private static final String INPUT_TOPIC_2 = "input-topic-2";
     private static final String OUTPUT_TOPIC_1 = "output-topic-1";
@@ -118,6 +121,18 @@ public class ProcessorTopologyTest {
         driver = null;
     }
 
+    private List<KeyValue<String, String>> prefixScanResults(final 
KeyValueStore<String, String> store, final String prefix) {
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        try (final KeyValueIterator<String, String> prefixScan = 
store.prefixScan(prefix, Serdes.String().serializer())) {
+            while (prefixScan.hasNext()) {
+                final KeyValue<String, String> next = prefixScan.next();
+                results.add(next);
+            }
+        }
+
+        return results;
+    }
+
     @Test
     public void testTopologyMetadata() {
         topology.addSource("source-1", "topic-1");
@@ -389,14 +404,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -411,13 +425,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -430,14 +439,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -452,13 +460,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -471,14 +474,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -493,13 +495,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -512,14 +509,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanPersistentStoreNoCachingNoLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -534,13 +530,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -553,14 +544,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanPersistentStoreWithCachingNoLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -575,13 +565,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -594,14 +579,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanPersistentStoreWithCachingWithLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -616,13 +600,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -635,14 +614,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
 Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -657,13 +635,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -676,14 +649,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanPersistentTimestampedStoreWithCachingNoLogging() 
{
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
 Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -698,13 +670,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -717,14 +684,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void 
testPrefixScanPersistentTimestampedStoreWithCachingWithLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
 Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -739,13 +705,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -758,14 +719,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanLruMapNoCachingNoLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+            Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 
100), Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -780,13 +740,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -799,14 +754,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanLruMapWithCachingNoLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+            Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 
100), Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -821,13 +775,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -840,14 +789,13 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testPrefixScanLruMapWithCachingWithLogging() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+            Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 
100), Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), 
"source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -862,13 +810,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -882,14 +825,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void testPrefixScanInMemoryStoreNoCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -904,13 +846,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -924,14 +861,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void testPrefixScanInMemoryStoreWithCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -946,13 +882,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -966,14 +897,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void 
testPrefixScanInMemoryStoreWithCachingWithLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -988,13 +918,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1008,14 +933,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void testPrefixScanPersistentStoreNoCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1030,13 +954,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1050,14 +969,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void 
testPrefixScanPersistentStoreWithCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1072,13 +990,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1092,14 +1005,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void 
testPrefixScanPersistentStoreWithCachingWithLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1114,13 +1026,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1134,14 +1041,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void 
testPrefixScanPersistentTimestampedStoreNoCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
 Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1156,13 +1062,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1176,14 +1077,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void 
testPrefixScanPersistentTimestampedStoreWithCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
 Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1198,13 +1098,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1218,14 +1113,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void 
testPrefixScanPersistentTimestampedStoreWithCachingWithLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
 Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1240,13 +1134,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1260,14 +1149,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void testPrefixScanLruMapNoCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+            Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 
100), Serdes.String(), Serdes.String())
                 .withCachingDisabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1282,13 +1170,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1302,14 +1185,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void testPrefixScanLruMapWithCachingNoLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+            Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 
100), Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingDisabled();
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1324,13 +1206,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
@@ -1344,14 +1221,13 @@ public class ProcessorTopologyTest {
     @Deprecated // testing old PAPI
     @Test
     public void testPrefixScanLruMapWithCachingWithLoggingOldProcessor() {
-        final String storeName = "prefixScanStore";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+            Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 
100), Serdes.String(), Serdes.String())
                 .withCachingEnabled()
                 .withLoggingEnabled(Collections.emptyMap());
         topology
             .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(DEFAULT_STORE_NAME), 
Collections.singleton(storeBuilder)), "source1")
             .addSink("counts", OUTPUT_TOPIC_1, "processor1");
 
         driver = new TopologyTestDriver(topology, props);
@@ -1366,13 +1242,8 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key1", "value4");
         assertTrue(outputTopic1.isEmpty());
 
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
-        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
-        final List<KeyValue<String, String>> results = new ArrayList<>();
-        while (prefixScan.hasNext()) {
-            final KeyValue<String, String> next = prefixScan.next();
-            results.add(next);
-        }
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
 
         assertEquals("key1", results.get(0).key);
         assertEquals("value4", results.get(0).value);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index e8a05ff..573cbe6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -52,6 +52,7 @@ import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
 import org.rocksdb.Cache;
@@ -89,7 +90,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.powermock.api.easymock.PowerMock.replay;
 import static org.powermock.api.easymock.PowerMock.verify;
 
@@ -407,19 +407,20 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         rocksDBStore.putAll(entries);
         rocksDBStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("prefix", stringSerializer);
-        final List<String> valuesWithPrefix = new ArrayList<>();
-        int numberOfKeysReturned = 0;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("prefix", stringSerializer)) {
+            final List<String> valuesWithPrefix = new ArrayList<>();
+            int numberOfKeysReturned = 0;
 
-        while (keysWithPrefix.hasNext()) {
-            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
-            valuesWithPrefix.add(new String(next.value));
-            numberOfKeysReturned++;
+            while (keysWithPrefix.hasNext()) {
+                final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+                valuesWithPrefix.add(new String(next.value));
+                numberOfKeysReturned++;
+            }
+            assertThat(numberOfKeysReturned, is(3));
+            assertThat(valuesWithPrefix.get(0), is("f"));
+            assertThat(valuesWithPrefix.get(1), is("d"));
+            assertThat(valuesWithPrefix.get(2), is("b"));
         }
-        assertThat(numberOfKeysReturned, is(3));
-        assertThat(valuesWithPrefix.get(0), is("f"));
-        assertThat(valuesWithPrefix.get(1), is("d"));
-        assertThat(valuesWithPrefix.get(2), is("b"));
     }
 
     @Test
@@ -441,15 +442,16 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         rocksDBStore.putAll(entries);
         rocksDBStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = 
rocksDBStore.prefixScan("abcd", stringSerializer);
-        int numberOfKeysReturned = 0;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = 
rocksDBStore.prefixScan("abcd", stringSerializer)) {
+            int numberOfKeysReturned = 0;
 
-        while (keysWithPrefixAsabcd.hasNext()) {
-            keysWithPrefixAsabcd.next().key.get();
-            numberOfKeysReturned++;
-        }
+            while (keysWithPrefixAsabcd.hasNext()) {
+                keysWithPrefixAsabcd.next().key.get();
+                numberOfKeysReturned++;
+            }
 
-        assertThat(numberOfKeysReturned, is(1));
+            assertThat(numberOfKeysReturned, is(1));
+        }
     }
 
     @Test
@@ -472,21 +474,22 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         rocksDBStore.putAll(entries);
         rocksDBStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan(prefix, stringSerializer);
-        final List<String> valuesWithPrefix = new ArrayList<>();
-        int numberOfKeysReturned = 0;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan(prefix, stringSerializer)) {
+            final List<String> valuesWithPrefix = new ArrayList<>();
+            int numberOfKeysReturned = 0;
 
-        while (keysWithPrefix.hasNext()) {
-            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
-            valuesWithPrefix.add(new String(next.value));
-            numberOfKeysReturned++;
-        }
+            while (keysWithPrefix.hasNext()) {
+                final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+                valuesWithPrefix.add(new String(next.value));
+                numberOfKeysReturned++;
+            }
 
-        assertThat(numberOfKeysReturned, is(numMatches));
-        if (numMatches == 2) {
-            assertThat(valuesWithPrefix.get(0), either(is("a")).or(is("b")));
-        } else {
-            assertThat(valuesWithPrefix.get(0), is("a"));
+            assertThat(numberOfKeysReturned, is(numMatches));
+            if (numMatches == 2) {
+                assertThat(valuesWithPrefix.get(0), 
either(is("a")).or(is("b")));
+            } else {
+                assertThat(valuesWithPrefix.get(0), is("a"));
+            }
         }
     }
 
@@ -506,14 +509,15 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         rocksDBStore.putAll(entries);
         rocksDBStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("d", stringSerializer);
-        int numberOfKeysReturned = 0;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("d", stringSerializer)) {
+            int numberOfKeysReturned = 0;
 
-        while (keysWithPrefix.hasNext()) {
-            keysWithPrefix.next();
-            numberOfKeysReturned++;
+            while (keysWithPrefix.hasNext()) {
+                keysWithPrefix.next();
+                numberOfKeysReturned++;
+            }
+            assertThat(numberOfKeysReturned, is(0));
         }
-        assertThat(numberOfKeysReturned, is(0));
     }
 
     @Test
@@ -562,14 +566,15 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
-        final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
-        final Set<String> keys = new HashSet<>();
+        try (final KeyValueIterator<Bytes, byte[]> iterator = 
rocksDBStore.all()) {
+            final Set<String> keys = new HashSet<>();
 
-        while (iterator.hasNext()) {
-            keys.add(stringDeserializer.deserialize(null, 
iterator.next().key.get()));
-        }
+            while (iterator.hasNext()) {
+                keys.add(stringDeserializer.deserialize(null, 
iterator.next().key.get()));
+            }
 
-        assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+            assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+        }
     }
 
     @Test
@@ -586,30 +591,31 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
-        final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
-        final Set<String> keys = new HashSet<>();
+        try (final KeyValueIterator<Bytes, byte[]> iterator = 
rocksDBStore.all()) {
+            final Set<String> keys = new HashSet<>();
 
-        while (iterator.hasNext()) {
-            keys.add(stringDeserializer.deserialize(null, 
iterator.next().key.get()));
-        }
-
-        assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
+            while (iterator.hasNext()) {
+                keys.add(stringDeserializer.deserialize(null, 
iterator.next().key.get()));
+            }
 
-        assertEquals(
-            "restored",
-            stringDeserializer.deserialize(
-                null,
-                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"1")))));
-        assertEquals(
-            "b",
-            stringDeserializer.deserialize(
-                null,
-                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"2")))));
-        assertEquals(
-            "c",
-            stringDeserializer.deserialize(
-                null,
-                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"3")))));
+            assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
+
+            assertEquals(
+                "restored",
+                stringDeserializer.deserialize(
+                    null,
+                    rocksDBStore.get(new 
Bytes(stringSerializer.serialize(null, "1")))));
+            assertEquals(
+                "b",
+                stringDeserializer.deserialize(
+                    null,
+                    rocksDBStore.get(new 
Bytes(stringSerializer.serialize(null, "2")))));
+            assertEquals(
+                "c",
+                stringDeserializer.deserialize(
+                    null,
+                    rocksDBStore.get(new 
Bytes(stringSerializer.serialize(null, "3")))));
+        }
     }
 
     @Test
@@ -644,14 +650,15 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         context.restore(rocksDBStore.name(), entries);
 
-        final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
-        final Set<String> keys = new HashSet<>();
+        try (final KeyValueIterator<Bytes, byte[]> iterator = 
rocksDBStore.all()) {
+            final Set<String> keys = new HashSet<>();
 
-        while (iterator.hasNext()) {
-            keys.add(stringDeserializer.deserialize(null, 
iterator.next().key.get()));
-        }
+            while (iterator.hasNext()) {
+                keys.add(stringDeserializer.deserialize(null, 
iterator.next().key.get()));
+            }
 
-        assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+            assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+        }
     }
 
     @Test
@@ -872,10 +879,11 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
-        final KeyValueIterator<Integer, String> range = store.range(1, 2);
-        assertEquals("hi", range.next().value);
-        assertEquals("goodbye", range.next().value);
-        assertFalse(range.hasNext());
+        try (final KeyValueIterator<Integer, String> range = store.range(1, 
2)) {
+            assertEquals("hi", range.next().value);
+            assertEquals("goodbye", range.next().value);
+            assertFalse(range.hasNext());
+        }
     }
 
     @Test
@@ -883,10 +891,11 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
-        final KeyValueIterator<Integer, String> range = store.all();
-        assertEquals("hi", range.next().value);
-        assertEquals("goodbye", range.next().value);
-        assertFalse(range.hasNext());
+        try (final KeyValueIterator<Integer, String> range = store.all()) {
+            assertEquals("hi", range.next().value);
+            assertEquals("goodbye", range.next().value);
+            assertFalse(range.hasNext());
+        }
     }
 
     @Test
@@ -894,40 +903,18 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
-        final KeyValueIterator<Integer, String> iteratorOne = store.range(1, 
5);
-        final KeyValueIterator<Integer, String> iteratorTwo = store.range(1, 
4);
-
-        assertTrue(iteratorOne.hasNext());
-        assertTrue(iteratorTwo.hasNext());
+        try (final KeyValueIterator<Integer, String> iteratorOne = 
store.range(1, 5);
+             final KeyValueIterator<Integer, String> iteratorTwo = 
store.range(1, 4)) {
 
-        store.close();
+            assertTrue(iteratorOne.hasNext());
+            assertTrue(iteratorTwo.hasNext());
 
-        try {
-            iteratorOne.hasNext();
-            fail("should have thrown InvalidStateStoreException on closed 
store");
-        } catch (final InvalidStateStoreException e) {
-            // ok
-        }
+            store.close();
 
-        try {
-            iteratorOne.next();
-            fail("should have thrown InvalidStateStoreException on closed 
store");
-        } catch (final InvalidStateStoreException e) {
-            // ok
-        }
-
-        try {
-            iteratorTwo.hasNext();
-            fail("should have thrown InvalidStateStoreException on closed 
store");
-        } catch (final InvalidStateStoreException e) {
-            // ok
-        }
-
-        try {
-            iteratorTwo.next();
-            fail("should have thrown InvalidStateStoreException on closed 
store");
-        } catch (final InvalidStateStoreException e) {
-            // ok
+            Assertions.assertThrows(InvalidStateStoreException.class, () -> 
iteratorOne.hasNext());
+            Assertions.assertThrows(InvalidStateStoreException.class, () -> 
iteratorOne.next());
+            Assertions.assertThrows(InvalidStateStoreException.class, () -> 
iteratorTwo.hasNext());
+            Assertions.assertThrows(InvalidStateStoreException.class, () -> 
iteratorTwo.next());
         }
     }
         
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 3bb9f14..7da5245 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -79,16 +79,17 @@ public class RocksDBWindowStoreTest extends 
AbstractWindowBytesStoreTest {
 
         windowStore.put(1, "three", currentTime);
 
-        final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, 
currentTime);
+        try (final WindowStoreIterator<String> iterator = windowStore.fetch(1, 
0L, currentTime)) {
 
-        // roll to the next segment that will close the first
-        currentTime = currentTime + SEGMENT_INTERVAL;
-        windowStore.put(1, "four", currentTime);
+            // roll to the next segment that will close the first
+            currentTime = currentTime + SEGMENT_INTERVAL;
+            windowStore.put(1, "four", currentTime);
 
-        // should only have 2 values as the first segment is no longer open
-        assertEquals(new KeyValue<>(SEGMENT_INTERVAL, "two"), iterator.next());
-        assertEquals(new KeyValue<>(2 * SEGMENT_INTERVAL, "three"), 
iterator.next());
-        assertFalse(iterator.hasNext());
+            // should only have 2 values as the first segment is no longer open
+            assertEquals(new KeyValue<>(SEGMENT_INTERVAL, "two"), 
iterator.next());
+            assertEquals(new KeyValue<>(2 * SEGMENT_INTERVAL, "three"), 
iterator.next());
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index cd34fb0..40b06c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -54,6 +55,13 @@ public class SessionKeySchemaTest {
     private final SessionKeySchema sessionKeySchema = new SessionKeySchema();
     private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
 
+    @After
+    public void after() {
+        if (iterator != null) {
+            iterator.close();
+        }
+    }
+
     @Before
     public void before() {
         final List<KeyValue<Bytes, Integer>> keys = 
Arrays.asList(KeyValue.pair(SessionKeySchema.toBinary(new 
Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index bc17abf..8f0ca83 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -63,14 +63,16 @@ public class WindowKeySchemaTest {
             KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new 
Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(10, 20)), 4), 4),
             KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new 
Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(10, 20)), 5), 5),
             KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new 
Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(10, 20)), 6), 6));
-        final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator = new 
DelegatingPeekingKeyValueIterator<>("foo", new 
KeyValueIteratorStub<>(keys.iterator()));
+        try (final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator 
= new DelegatingPeekingKeyValueIterator<>("foo", new 
KeyValueIteratorStub<>(keys.iterator()))) {
 
-        final HasNextCondition hasNextCondition = 
windowKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
-        final List<Integer> results = new ArrayList<>();
-        while (hasNextCondition.hasNext(iterator)) {
-            results.add(iterator.next().value);
+            final HasNextCondition hasNextCondition = 
windowKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
+            final List<Integer> results = new ArrayList<>();
+            while (hasNextCondition.hasNext(iterator)) {
+                results.add(iterator.next().value);
+            }
+
+            assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
         }
-        assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
     }
 
     @Test

Reply via email to