This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 818cbfb  KAFKA-13125: close KeyValueIterator instances in internals 
tests (part 2) (#11107)
818cbfb is described below

commit 818cbfba6ddf8252b7da314bbaac74201951dfb3
Author: Luke Chen <[email protected]>
AuthorDate: Tue Jul 27 07:26:02 2021 +0800

    KAFKA-13125: close KeyValueIterator instances in internals tests (part 2) 
(#11107)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../CachingPersistentWindowStoreTest.java          | 361 +++++++++++----------
 .../ChangeLoggingKeyValueBytesStoreTest.java       |  15 +-
 .../CompositeReadOnlyKeyValueStoreTest.java        |  48 +--
 .../CompositeReadOnlySessionStoreTest.java         |  12 +-
 .../CompositeReadOnlyWindowStoreTest.java          |  80 +++--
 .../state/internals/InMemoryKeyValueStoreTest.java |  48 +--
 .../state/internals/InMemoryWindowStoreTest.java   |  15 +-
 ...dSortedCacheKeyValueBytesStoreIteratorTest.java |  39 ++-
 8 files changed, 337 insertions(+), 281 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 7c316f4..023d69a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -165,10 +165,11 @@ public class CachingPersistentWindowStoreTest {
                     this.store = (WindowStore<String, String>) 
processorContext.getStateStore("store-name");
                     int count = 0;
 
-                    final KeyValueIterator<Windowed<String>, String> all = 
store.all();
-                    while (all.hasNext()) {
-                        count++;
-                        all.next();
+                    try (final KeyValueIterator<Windowed<String>, String> all 
= store.all()) {
+                        while (all.hasNext()) {
+                            count++;
+                            all.next();
+                        }
                     }
 
                     assertThat(count, equalTo(0));
@@ -178,11 +179,13 @@ public class CachingPersistentWindowStoreTest {
                 public KeyValue<String, String> transform(final String key, 
final String value) {
                     int count = 0;
 
-                    final KeyValueIterator<Windowed<String>, String> all = 
store.all();
-                    while (all.hasNext()) {
-                        count++;
-                        all.next();
+                    try (final KeyValueIterator<Windowed<String>, String> all 
= store.all()) {
+                        while (all.hasNext()) {
+                            count++;
+                            all.next();
+                        }
                     }
+
                     assertThat(count, equalTo(numRecordsProcessed));
 
                     store.put(value, value, context.timestamp());
@@ -245,13 +248,14 @@ public class CachingPersistentWindowStoreTest {
         assertThat(cachingStore.fetch(bytesKey("c"), 10), equalTo(null));
         assertThat(cachingStore.fetch(bytesKey("a"), 0), equalTo(null));
 
-        final WindowStoreIterator<byte[]> a = 
cachingStore.fetch(bytesKey("a"), ofEpochMilli(10), ofEpochMilli(10));
-        final WindowStoreIterator<byte[]> b = 
cachingStore.fetch(bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10));
-        verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a");
-        verifyKeyValue(b.next(), DEFAULT_TIMESTAMP, "b");
-        assertFalse(a.hasNext());
-        assertFalse(b.hasNext());
-        assertEquals(2, cache.size());
+        try (final WindowStoreIterator<byte[]> a = 
cachingStore.fetch(bytesKey("a"), ofEpochMilli(10), ofEpochMilli(10));
+             final WindowStoreIterator<byte[]> b = 
cachingStore.fetch(bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) {
+            verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a");
+            verifyKeyValue(b.next(), DEFAULT_TIMESTAMP, "b");
+            assertFalse(a.hasNext());
+            assertFalse(b.hasNext());
+            assertEquals(2, cache.size());
+        }
     }
 
     private void verifyKeyValue(final KeyValue<Long, byte[]> next,
@@ -278,18 +282,19 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
         cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
-            cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), 
ofEpochMilli(10));
-        verifyWindowedKeyValue(
-            iterator.next(),
-            new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-            "a");
-        verifyWindowedKeyValue(
-            iterator.next(),
-            new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-            "b");
-        assertFalse(iterator.hasNext());
-        assertEquals(2, cache.size());
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(bytesKey("a"), bytesKey("b"), 
ofEpochMilli(10), ofEpochMilli(10))) {
+            verifyWindowedKeyValue(
+                iterator.next(),
+                new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                "a");
+            verifyWindowedKeyValue(
+                iterator.next(),
+                new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                "b");
+            assertFalse(iterator.hasNext());
+            assertEquals(2, cache.size());
+        }
     }
 
     @Test
@@ -303,15 +308,16 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
         cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.all();
-        final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
-        for (final String s : array) {
-            verifyWindowedKeyValue(
-                iterator.next(),
-                new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-                s);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.all()) {
+            final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+            for (final String s : array) {
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(s), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                    s);
+            }
+            assertFalse(iterator.hasNext());
         }
-        assertFalse(iterator.hasNext());
     }
 
     @Test
@@ -325,15 +331,16 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
         cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.backwardAll();
-        final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
-        for (final String s : array) {
-            verifyWindowedKeyValue(
-                iterator.next(),
-                new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-                s);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.backwardAll()) {
+            final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
+            for (final String s : array) {
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(s), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                    s);
+            }
+            assertFalse(iterator.hasNext());
         }
-        assertFalse(iterator.hasNext());
     }
 
     @Test
@@ -343,38 +350,41 @@ public class CachingPersistentWindowStoreTest {
             cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
         }
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
-            cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7));
-        for (int i = 0; i < array.length; i++) {
-            final String str = array[i];
-            verifyWindowedKeyValue(
-                iterator.next(),
-                new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
-                str);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7))) {
+            for (int i = 0; i < array.length; i++) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator.hasNext());
         }
-        assertFalse(iterator.hasNext());
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
-            cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4));
-        for (int i = 2; i <= 4; i++) {
-            final String str = array[i];
-            verifyWindowedKeyValue(
-                iterator1.next(),
-                new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
-                str);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
+                 cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4))) {
+            for (int i = 2; i <= 4; i++) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator1.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator1.hasNext());
         }
-        assertFalse(iterator1.hasNext());
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
-            cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7));
-        for (int i = 5; i <= 7; i++) {
-            final String str = array[i];
-            verifyWindowedKeyValue(
-                iterator2.next(),
-                new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
-                str);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
+                 cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7))) {
+            for (int i = 5; i <= 7; i++) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator2.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator2.hasNext());
         }
-        assertFalse(iterator2.hasNext());
     }
 
     @Test
@@ -384,53 +394,57 @@ public class CachingPersistentWindowStoreTest {
             cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
         }
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
-            cachingStore.backwardFetchAll(ofEpochMilli(0), ofEpochMilli(7));
-        for (int i = array.length - 1; i >= 0; i--) {
-            final String str = array[i];
-            verifyWindowedKeyValue(
-                iterator.next(),
-                new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
-                str);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetchAll(ofEpochMilli(0), 
ofEpochMilli(7))) {
+            for (int i = array.length - 1; i >= 0; i--) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator.hasNext());
         }
-        assertFalse(iterator.hasNext());
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
-            cachingStore.backwardFetchAll(ofEpochMilli(2), ofEpochMilli(4));
-        for (int i = 4; i >= 2; i--) {
-            final String str = array[i];
-            verifyWindowedKeyValue(
-                iterator1.next(),
-                new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
-                str);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
+                 cachingStore.backwardFetchAll(ofEpochMilli(2), 
ofEpochMilli(4))) {
+            for (int i = 4; i >= 2; i--) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator1.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator1.hasNext());
         }
-        assertFalse(iterator1.hasNext());
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
-            cachingStore.backwardFetchAll(ofEpochMilli(5), ofEpochMilli(7));
-        for (int i = 7; i >= 5; i--) {
-            final String str = array[i];
-            verifyWindowedKeyValue(
-                iterator2.next(),
-                new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
-                str);
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
+                 cachingStore.backwardFetchAll(ofEpochMilli(5), 
ofEpochMilli(7))) {
+            for (int i = 7; i >= 5; i--) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator2.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator2.hasNext());
         }
-        assertFalse(iterator2.hasNext());
     }
 
     @Test
     public void shouldFlushEvictedItemsIntoUnderlyingStore() {
         final int added = addItemsToCache();
         // all dirty entries should have been flushed
-        final KeyValueIterator<Bytes, byte[]> iter = bytesStore.fetch(
+        try (final KeyValueIterator<Bytes, byte[]> iter = bytesStore.fetch(
             Bytes.wrap("0".getBytes(StandardCharsets.UTF_8)),
             DEFAULT_TIMESTAMP,
-            DEFAULT_TIMESTAMP);
-        final KeyValue<Bytes, byte[]> next = iter.next();
-        assertEquals(DEFAULT_TIMESTAMP, keySchema.segmentTimestamp(next.key));
-        assertArrayEquals("0".getBytes(), next.value);
-        assertFalse(iter.hasNext());
-        assertEquals(added - 1, cache.size());
+            DEFAULT_TIMESTAMP)) {
+            final KeyValue<Bytes, byte[]> next = iter.next();
+            assertEquals(DEFAULT_TIMESTAMP, 
keySchema.segmentTimestamp(next.key));
+            assertArrayEquals("0".getBytes(), next.value);
+            assertFalse(iter.hasNext());
+            assertEquals(added - 1, cache.size());
+        }
     }
 
     @Test
@@ -515,10 +529,11 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.flush();
         cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
 
-        final WindowStoreIterator<byte[]> fetch =
-            cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), 
ofEpochMilli(DEFAULT_TIMESTAMP));
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "b");
-        assertFalse(fetch.hasNext());
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.fetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "b");
+            assertFalse(fetch.hasNext());
+        }
     }
 
     @Test
@@ -526,11 +541,12 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
         cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
 
-        final WindowStoreIterator<byte[]> fetch =
-            cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), 
ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
-        assertFalse(fetch.hasNext());
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.fetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            assertFalse(fetch.hasNext());
+        }
     }
 
     @Test
@@ -538,11 +554,12 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
         cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
 
-        final WindowStoreIterator<byte[]> fetch =
-            cachingStore.backwardFetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
-        assertFalse(fetch.hasNext());
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.backwardFetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            assertFalse(fetch.hasNext());
+        }
     }
 
     @Test
@@ -550,11 +567,12 @@ public class CachingPersistentWindowStoreTest {
         final Bytes key = Bytes.wrap("1".getBytes());
         bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
-        final WindowStoreIterator<byte[]> fetch =
-            cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), 
ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
-        assertFalse(fetch.hasNext());
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.fetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            assertFalse(fetch.hasNext());
+        }
     }
 
     @Test
@@ -562,11 +580,12 @@ public class CachingPersistentWindowStoreTest {
         final Bytes key = Bytes.wrap("1".getBytes());
         bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
-        final WindowStoreIterator<byte[]> fetch =
-            cachingStore.backwardFetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
-        verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
-        assertFalse(fetch.hasNext());
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.backwardFetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            assertFalse(fetch.hasNext());
+        }
     }
 
     @Test
@@ -575,17 +594,18 @@ public class CachingPersistentWindowStoreTest {
         bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
-            cachingStore.fetch(key, bytesKey("2"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        verifyWindowedKeyValue(
-            fetchRange.next(),
-            new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-            "a");
-        verifyWindowedKeyValue(
-            fetchRange.next(),
-            new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + 
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
-            "b");
-        assertFalse(fetchRange.hasNext());
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
+                 cachingStore.fetch(key, bytesKey("2"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                "a");
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + 
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
+                "b");
+            assertFalse(fetchRange.hasNext());
+        }
     }
 
     @Test
@@ -594,17 +614,18 @@ public class CachingPersistentWindowStoreTest {
         bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
-            cachingStore.backwardFetch(key, bytesKey("2"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        verifyWindowedKeyValue(
-            fetchRange.next(),
-            new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + 
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
-            "b");
-        verifyWindowedKeyValue(
-            fetchRange.next(),
-            new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-            "a");
-        assertFalse(fetchRange.hasNext());
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
+                 cachingStore.backwardFetch(key, bytesKey("2"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + 
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
+                "b");
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                "a");
+            assertFalse(fetchRange.hasNext());
+        }
     }
 
     @Test
@@ -748,13 +769,14 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2);
         cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3);
 
-        final WindowStoreIterator<byte[]> singleKeyIterator = 
cachingStore.fetch(bytesKey("aa"), 0L, 5L);
-        final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = 
cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, 5L);
+        try (final WindowStoreIterator<byte[]> singleKeyIterator = 
cachingStore.fetch(bytesKey("aa"), 0L, 5L);
+             final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator 
= cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, 5L)) {
 
-        assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
-        assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
-        assertFalse(singleKeyIterator.hasNext());
-        assertFalse(keyRangeIterator.hasNext());
+            assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
+            assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
+            assertFalse(singleKeyIterator.hasNext());
+            assertFalse(keyRangeIterator.hasNext());
+        }
     }
 
     @Test
@@ -764,15 +786,16 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2);
         cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3);
 
-        final WindowStoreIterator<byte[]> singleKeyIterator =
-            cachingStore.backwardFetch(bytesKey("aa"), 
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
-        final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator =
-            cachingStore.backwardFetch(bytesKey("aa"), bytesKey("aa"), 
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
+        try (final WindowStoreIterator<byte[]> singleKeyIterator =
+                 cachingStore.backwardFetch(bytesKey("aa"), 
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
+             final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator =
+                 cachingStore.backwardFetch(bytesKey("aa"), bytesKey("aa"), 
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L))) {
 
-        assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
-        assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
-        assertFalse(singleKeyIterator.hasNext());
-        assertFalse(keyRangeIterator.hasNext());
+            assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
+            assertEquals(stringFrom(singleKeyIterator.next().value), 
stringFrom(keyRangeIterator.next().value));
+            assertFalse(singleKeyIterator.hasNext());
+            assertFalse(keyRangeIterator.hasNext());
+        }
     }
 
     @Test
@@ -805,8 +828,8 @@ public class CachingPersistentWindowStoreTest {
         final Bytes keyFrom = 
Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
         final Bytes keyTo = 
Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(CachingWindowStore.class)) {
-            final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.fetch(keyFrom, keyTo, 0L, 10L);
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(CachingWindowStore.class);
+             final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.fetch(keyFrom, keyTo, 0L, 10L)) {
             assertFalse(iterator.hasNext());
 
             final List<String> messages = appender.getMessages();
@@ -825,9 +848,9 @@ public class CachingPersistentWindowStoreTest {
         final Bytes keyFrom = 
Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
         final Bytes keyTo = 
Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(CachingWindowStore.class)) {
-            final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
-                cachingStore.backwardFetch(keyFrom, keyTo, 
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L));
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(CachingWindowStore.class);
+             final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetch(keyFrom, keyTo, 
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L))) {
             assertFalse(iterator.hasNext());
 
             final List<String> messages = appender.getMessages();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 5f524fa..6e8e979 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -206,17 +206,20 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     public void shouldGetRecordsWithPrefixKey() {
         store.put(hi, there);
         store.put(Bytes.increment(hi), world);
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan(hi.toString(), new StringSerializer());
+
         final List<Bytes> keys = new ArrayList<>();
         final List<Bytes> values = new ArrayList<>();
         int numberOfKeysReturned = 0;
 
-        while (keysWithPrefix.hasNext()) {
-            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
-            keys.add(next.key);
-            values.add(Bytes.wrap(next.value));
-            numberOfKeysReturned++;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan(hi.toString(), new StringSerializer())) {
+            while (keysWithPrefix.hasNext()) {
+                final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+                keys.add(next.key);
+                values.add(Bytes.wrap(next.value));
+                numberOfKeysReturned++;
+            }
         }
+
         assertThat(numberOfKeysReturned, is(1));
         assertThat(keys, is(Collections.singletonList(hi)));
         assertThat(values, is(Collections.singletonList(Bytes.wrap(there))));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 652694b..92182c5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -148,63 +148,71 @@ public class CompositeReadOnlyKeyValueStoreTest {
     @Test
     public void shouldThrowNoSuchElementExceptionWhileNext() {
         stubOneUnderlying.put("a", "1");
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.range("a", "b");
-        keyValueIterator.next();
-        assertThrows(NoSuchElementException.class, keyValueIterator::next);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.range("a", "b")) {
+            keyValueIterator.next();
+            assertThrows(NoSuchElementException.class, keyValueIterator::next);
+        }
     }
 
     @Test
     public void shouldThrowNoSuchElementExceptionWhilePeekNext() {
         stubOneUnderlying.put("a", "1");
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.range("a", "b");
-        keyValueIterator.next();
-        assertThrows(NoSuchElementException.class, 
keyValueIterator::peekNextKey);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.range("a", "b")) {
+            keyValueIterator.next();
+            assertThrows(NoSuchElementException.class, 
keyValueIterator::peekNextKey);
+        }
     }
 
     @Test
     public void shouldThrowNoSuchElementExceptionWhileNextForPrefixScan() {
         stubOneUnderlying.put("a", "1");
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer());
-        keyValueIterator.next();
-        assertThrows(NoSuchElementException.class, keyValueIterator::next);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer())) {
+            keyValueIterator.next();
+            assertThrows(NoSuchElementException.class, keyValueIterator::next);
+        }
     }
 
     @Test
     public void shouldThrowNoSuchElementExceptionWhilePeekNextForPrefixScan() {
         stubOneUnderlying.put("a", "1");
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer());
-        keyValueIterator.next();
-        assertThrows(NoSuchElementException.class, 
keyValueIterator::peekNextKey);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer())) {
+            keyValueIterator.next();
+            assertThrows(NoSuchElementException.class, 
keyValueIterator::peekNextKey);
+        }
     }
 
     @Test
     public void shouldThrowUnsupportedOperationExceptionWhileRemove() {
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.all();
-        assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.all()) {
+            assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        }
     }
 
     @Test
     public void shouldThrowUnsupportedOperationExceptionWhileReverseRange() {
         stubOneUnderlying.put("a", "1");
         stubOneUnderlying.put("b", "1");
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.reverseRange("a", "b");
-        assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.reverseRange("a", "b")) {
+            assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        }
     }
 
     @Test
     public void shouldThrowUnsupportedOperationExceptionWhileRange() {
         stubOneUnderlying.put("a", "1");
         stubOneUnderlying.put("b", "1");
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.range("a", "b");
-        assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.range("a", "b")) {
+            assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        }
     }
 
     @Test
     public void shouldThrowUnsupportedOperationExceptionWhilePrefixScan() {
         stubOneUnderlying.put("a", "1");
         stubOneUnderlying.put("b", "1");
-        final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer());
-        assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        try (final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer())) {
+            assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index b3da4c6..66ae243 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -76,8 +76,9 @@ public class CompositeReadOnlySessionStoreTest {
 
     @Test
     public void shouldReturnEmptyIteratorIfNoData() {
-        final KeyValueIterator<Windowed<String>, Long> result = 
sessionStore.fetch("b");
-        assertFalse(result.hasNext());
+        try (final KeyValueIterator<Windowed<String>, Long> result = 
sessionStore.fetch("b")) {
+            assertFalse(result.hasNext());
+        }
     }
 
     @Test
@@ -104,9 +105,10 @@ public class CompositeReadOnlySessionStoreTest {
         otherUnderlyingStore.put(new Windowed<>("foo", new SessionWindow(10, 
10)), 10L);
         underlyingSessionStore.put(expectedKey, 1L);
 
-        final KeyValueIterator<Windowed<String>, Long> result = 
sessionStore.fetch("foo");
-        assertEquals(KeyValue.pair(expectedKey, 1L), result.next());
-        assertFalse(result.hasNext());
+        try (final KeyValueIterator<Windowed<String>, Long> result = 
sessionStore.fetch("foo")) {
+            assertEquals(KeyValue.pair(expectedKey, 1L), result.next());
+            assertFalse(result.hasNext());
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 7bb7268..3c486c3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -80,13 +80,14 @@ public class CompositeReadOnlyWindowStoreTest {
         underlyingWindowStore.put("my-key", "my-value", 0L);
         underlyingWindowStore.put("my-key", "my-later-value", 10L);
 
-        final WindowStoreIterator<String> iterator =
-            windowStore.fetch("my-key", ofEpochMilli(0L), ofEpochMilli(25L));
-        final List<KeyValue<Long, String>> results = 
StreamsTestUtils.toList(iterator);
+        try (final WindowStoreIterator<String> iterator =
+                 windowStore.fetch("my-key", ofEpochMilli(0L), 
ofEpochMilli(25L))) {
+            final List<KeyValue<Long, String>> results = 
StreamsTestUtils.toList(iterator);
 
-        assertEquals(
-            asList(new KeyValue<>(0L, "my-value"), new KeyValue<>(10L, 
"my-later-value")),
-            results);
+            assertEquals(
+                asList(new KeyValue<>(0L, "my-value"), new KeyValue<>(10L, 
"my-later-value")),
+                results);
+        }
     }
 
     @Test
@@ -94,27 +95,30 @@ public class CompositeReadOnlyWindowStoreTest {
         underlyingWindowStore.put("my-key", "my-value", 0L);
         underlyingWindowStore.put("my-key", "my-later-value", 10L);
 
-        final WindowStoreIterator<String> iterator =
-            windowStore.backwardFetch("my-key", ofEpochMilli(0L), 
ofEpochMilli(25L));
-        final List<KeyValue<Long, String>> results = 
StreamsTestUtils.toList(iterator);
+        try (final WindowStoreIterator<String> iterator =
+                 windowStore.backwardFetch("my-key", ofEpochMilli(0L), 
ofEpochMilli(25L))) {
+            final List<KeyValue<Long, String>> results = 
StreamsTestUtils.toList(iterator);
 
-        assertEquals(
-            asList(new KeyValue<>(10L, "my-later-value"), new KeyValue<>(0L, 
"my-value")),
-            results);
+            assertEquals(
+                asList(new KeyValue<>(10L, "my-later-value"), new 
KeyValue<>(0L, "my-value")),
+                results);
+        }
     }
 
     @Test
     public void shouldReturnEmptyIteratorIfNoData() {
-        final WindowStoreIterator<String> iterator =
-            windowStore.fetch("my-key", ofEpochMilli(0L), ofEpochMilli(25L));
-        assertFalse(iterator.hasNext());
+        try (final WindowStoreIterator<String> iterator =
+                 windowStore.fetch("my-key", ofEpochMilli(0L), 
ofEpochMilli(25L))) {
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
     public void shouldReturnBackwardEmptyIteratorIfNoData() {
-        final WindowStoreIterator<String> iterator =
-            windowStore.backwardFetch("my-key", ofEpochMilli(0L), 
ofEpochMilli(25L));
-        assertFalse(iterator.hasNext());
+        try (final WindowStoreIterator<String> iterator =
+                 windowStore.backwardFetch("my-key", ofEpochMilli(0L), 
ofEpochMilli(25L))) {
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
@@ -251,10 +255,11 @@ public class CompositeReadOnlyWindowStoreTest {
             QueryableStoreTypes.windowStore(),
             "foo"
         );
-        final WindowStoreIterator<Object> windowStoreIterator =
-            store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10));
+        try (final WindowStoreIterator<Object> windowStoreIterator =
+                 store.backwardFetch("key", ofEpochMilli(1), 
ofEpochMilli(10))) {
 
-        Assert.assertFalse(windowStoreIterator.hasNext());
+            Assert.assertFalse(windowStoreIterator.hasNext());
+        }
     }
 
     @Test
@@ -268,10 +273,11 @@ public class CompositeReadOnlyWindowStoreTest {
             QueryableStoreTypes.windowStore(),
             "foo"
         );
-        final WindowStoreIterator<Object> windowStoreIterator =
-            store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
+        try (final WindowStoreIterator<Object> windowStoreIterator =
+                 store.fetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
 
-        Assert.assertFalse(windowStoreIterator.hasNext());
+            Assert.assertFalse(windowStoreIterator.hasNext());
+        }
     }
 
     @Test
@@ -285,8 +291,9 @@ public class CompositeReadOnlyWindowStoreTest {
             QueryableStoreTypes.windowStore(),
             "foo"
         );
-        final WindowStoreIterator<Object> windowStoreIterator = 
store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10));
-        assertThrows(NoSuchElementException.class, 
windowStoreIterator::peekNextKey);
+        try (final WindowStoreIterator<Object> windowStoreIterator = 
store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
+            assertThrows(NoSuchElementException.class, 
windowStoreIterator::peekNextKey);
+        }
     }
 
 
@@ -301,9 +308,10 @@ public class CompositeReadOnlyWindowStoreTest {
             QueryableStoreTypes.windowStore(),
             "foo"
         );
-        final WindowStoreIterator<Object> windowStoreIterator =
-            store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
-        assertThrows(NoSuchElementException.class, 
windowStoreIterator::peekNextKey);
+        try (final WindowStoreIterator<Object> windowStoreIterator =
+                 store.fetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
+            assertThrows(NoSuchElementException.class, 
windowStoreIterator::peekNextKey);
+        }
     }
 
     @Test
@@ -317,9 +325,10 @@ public class CompositeReadOnlyWindowStoreTest {
             QueryableStoreTypes.windowStore(),
             "foo"
         );
-        final WindowStoreIterator<Object> windowStoreIterator =
-            store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
-        assertThrows(NoSuchElementException.class, windowStoreIterator::next);
+        try (final WindowStoreIterator<Object> windowStoreIterator =
+                 store.fetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
+            assertThrows(NoSuchElementException.class, 
windowStoreIterator::next);
+        }
     }
 
     @Test
@@ -333,9 +342,10 @@ public class CompositeReadOnlyWindowStoreTest {
             QueryableStoreTypes.windowStore(),
             "foo"
         );
-        final WindowStoreIterator<Object> windowStoreIterator =
-            store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10));
-        assertThrows(NoSuchElementException.class, windowStoreIterator::next);
+        try (final WindowStoreIterator<Object> windowStoreIterator =
+                 store.backwardFetch("key", ofEpochMilli(1), 
ofEpochMilli(10))) {
+            assertThrows(NoSuchElementException.class, 
windowStoreIterator::next);
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 87a2063..d67d665 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -127,15 +127,17 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         byteStore.putAll(entries);
         byteStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan("prefix", stringSerializer);
         final List<String> valuesWithPrefix = new ArrayList<>();
         int numberOfKeysReturned = 0;
 
-        while (keysWithPrefix.hasNext()) {
-            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
-            valuesWithPrefix.add(new String(next.value));
-            numberOfKeysReturned++;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan("prefix", stringSerializer)) {
+            while (keysWithPrefix.hasNext()) {
+                final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+                valuesWithPrefix.add(new String(next.value));
+                numberOfKeysReturned++;
+            }
         }
+
         assertThat(numberOfKeysReturned, is(3));
         assertThat(valuesWithPrefix.get(0), is("f"));
         assertThat(valuesWithPrefix.get(1), is("d"));
@@ -160,15 +162,16 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         byteStore.putAll(entries);
         byteStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = 
byteStore.prefixScan("abcd", stringSerializer);
-        int numberOfKeysReturned = 0;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = 
byteStore.prefixScan("abcd", stringSerializer)) {
+            int numberOfKeysReturned = 0;
 
-        while (keysWithPrefixAsabcd.hasNext()) {
-            keysWithPrefixAsabcd.next().key.get();
-            numberOfKeysReturned++;
-        }
+            while (keysWithPrefixAsabcd.hasNext()) {
+                keysWithPrefixAsabcd.next().key.get();
+                numberOfKeysReturned++;
+            }
 
-        assertThat(numberOfKeysReturned, is(1));
+            assertThat(numberOfKeysReturned, is(1));
+        }
     }
 
     @Test
@@ -188,14 +191,15 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         byteStore.putAll(entries);
         byteStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan(prefix, stringSerializer);
         final List<String> valuesWithPrefix = new ArrayList<>();
         int numberOfKeysReturned = 0;
 
-        while (keysWithPrefix.hasNext()) {
-            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
-            valuesWithPrefix.add(new String(next.value));
-            numberOfKeysReturned++;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan(prefix, stringSerializer)) {
+            while (keysWithPrefix.hasNext()) {
+                final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+                valuesWithPrefix.add(new String(next.value));
+                numberOfKeysReturned++;
+            }
         }
 
         assertThat(numberOfKeysReturned, is(1));
@@ -217,13 +221,15 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         byteStore.putAll(entries);
         byteStore.flush();
 
-        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan("bb", stringSerializer);
         int numberOfKeysReturned = 0;
 
-        while (keysWithPrefix.hasNext()) {
-            keysWithPrefix.next();
-            numberOfKeysReturned++;
+        try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
byteStore.prefixScan("bb", stringSerializer)) {
+            while (keysWithPrefix.hasNext()) {
+                keysWithPrefix.next();
+                numberOfKeysReturned++;
+            }
         }
+
         assertThat(numberOfKeysReturned, is(0));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index b18e8a7..5150839 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -75,13 +75,14 @@ public class InMemoryWindowStoreTest extends 
AbstractWindowBytesStoreTest {
             serdes.rawValue("three")));
 
         context.restore(STORE_NAME, restorableEntries);
-        final KeyValueIterator<Windowed<Integer>, String> iterator = 
windowStore
-            .fetchAll(0L, 2 * WINDOW_SIZE);
-
-        assertEquals(windowedPair(1, "one", 0L), iterator.next());
-        assertEquals(windowedPair(2, "two", WINDOW_SIZE), iterator.next());
-        assertEquals(windowedPair(3, "three", 2 * WINDOW_SIZE), 
iterator.next());
-        assertFalse(iterator.hasNext());
+        try (final KeyValueIterator<Windowed<Integer>, String> iterator = 
windowStore
+            .fetchAll(0L, 2 * WINDOW_SIZE)) {
+
+            assertEquals(windowedPair(1, "one", 0L), iterator.next());
+            assertEquals(windowedPair(2, "two", WINDOW_SIZE), iterator.next());
+            assertEquals(windowedPair(3, "three", 2 * WINDOW_SIZE), 
iterator.next());
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
index b8075d4..1716ac1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
@@ -99,9 +99,10 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest 
{
         final byte[][] bytes = {{0}, {1}};
         store.put(Bytes.wrap(bytes[0]), bytes[0]);
         cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
-        final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator();
-        assertArrayEquals(bytes[0], iterator.next().key.get());
-        assertFalse(iterator.hasNext());
+        try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator()) {
+            assertArrayEquals(bytes[0], iterator.next().key.get());
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
@@ -109,9 +110,10 @@ public class 
MergedSortedCacheKeyValueBytesStoreIteratorTest {
         final byte[][] bytes = {{0}, {1}};
         cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
         store.put(Bytes.wrap(bytes[1]), bytes[1]);
-        final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator();
-        assertArrayEquals(bytes[1], iterator.next().key.get());
-        assertFalse(iterator.hasNext());
+        try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator()) {
+            assertArrayEquals(bytes[1], iterator.next().key.get());
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
@@ -119,8 +121,9 @@ public class 
MergedSortedCacheKeyValueBytesStoreIteratorTest {
         final byte[][] bytes = {{0}};
         cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
         store.put(Bytes.wrap(bytes[0]), bytes[0]);
-        final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator();
-        assertFalse(iterator.hasNext());
+        try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator()) {
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test
@@ -157,16 +160,16 @@ public class 
MergedSortedCacheKeyValueBytesStoreIteratorTest {
         cache.put(namespace, Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
         cache.put(namespace, Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
 
-        final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator();
-        assertArrayEquals(bytes[0], iterator.next().key.get());
-        assertArrayEquals(bytes[4], iterator.next().key.get());
-        assertArrayEquals(bytes[5], iterator.next().key.get());
-        assertArrayEquals(bytes[6], iterator.next().key.get());
-        assertArrayEquals(bytes[7], iterator.next().key.get());
-        assertArrayEquals(bytes[9], iterator.next().key.get());
-        assertArrayEquals(bytes[10], iterator.next().key.get());
-        assertFalse(iterator.hasNext());
-
+        try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator()) {
+            assertArrayEquals(bytes[0], iterator.next().key.get());
+            assertArrayEquals(bytes[4], iterator.next().key.get());
+            assertArrayEquals(bytes[5], iterator.next().key.get());
+            assertArrayEquals(bytes[6], iterator.next().key.get());
+            assertArrayEquals(bytes[7], iterator.next().key.get());
+            assertArrayEquals(bytes[9], iterator.next().key.get());
+            assertArrayEquals(bytes[10], iterator.next().key.get());
+            assertFalse(iterator.hasNext());
+        }
     }
 
     @Test

Reply via email to