This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 818cbfb KAFKA-13125: close KeyValueIterator instances in internals
tests (part 2) (#11107)
818cbfb is described below
commit 818cbfba6ddf8252b7da314bbaac74201951dfb3
Author: Luke Chen <[email protected]>
AuthorDate: Tue Jul 27 07:26:02 2021 +0800
KAFKA-13125: close KeyValueIterator instances in internals tests (part 2)
(#11107)
Reviewers: Matthias J. Sax <[email protected]>
---
.../CachingPersistentWindowStoreTest.java | 361 +++++++++++----------
.../ChangeLoggingKeyValueBytesStoreTest.java | 15 +-
.../CompositeReadOnlyKeyValueStoreTest.java | 48 +--
.../CompositeReadOnlySessionStoreTest.java | 12 +-
.../CompositeReadOnlyWindowStoreTest.java | 80 +++--
.../state/internals/InMemoryKeyValueStoreTest.java | 48 +--
.../state/internals/InMemoryWindowStoreTest.java | 15 +-
...dSortedCacheKeyValueBytesStoreIteratorTest.java | 39 ++-
8 files changed, 337 insertions(+), 281 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 7c316f4..023d69a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -165,10 +165,11 @@ public class CachingPersistentWindowStoreTest {
this.store = (WindowStore<String, String>)
processorContext.getStateStore("store-name");
int count = 0;
- final KeyValueIterator<Windowed<String>, String> all =
store.all();
- while (all.hasNext()) {
- count++;
- all.next();
+ try (final KeyValueIterator<Windowed<String>, String> all
= store.all()) {
+ while (all.hasNext()) {
+ count++;
+ all.next();
+ }
}
assertThat(count, equalTo(0));
@@ -178,11 +179,13 @@ public class CachingPersistentWindowStoreTest {
public KeyValue<String, String> transform(final String key,
final String value) {
int count = 0;
- final KeyValueIterator<Windowed<String>, String> all =
store.all();
- while (all.hasNext()) {
- count++;
- all.next();
+ try (final KeyValueIterator<Windowed<String>, String> all
= store.all()) {
+ while (all.hasNext()) {
+ count++;
+ all.next();
+ }
}
+
assertThat(count, equalTo(numRecordsProcessed));
store.put(value, value, context.timestamp());
@@ -245,13 +248,14 @@ public class CachingPersistentWindowStoreTest {
assertThat(cachingStore.fetch(bytesKey("c"), 10), equalTo(null));
assertThat(cachingStore.fetch(bytesKey("a"), 0), equalTo(null));
- final WindowStoreIterator<byte[]> a =
cachingStore.fetch(bytesKey("a"), ofEpochMilli(10), ofEpochMilli(10));
- final WindowStoreIterator<byte[]> b =
cachingStore.fetch(bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10));
- verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a");
- verifyKeyValue(b.next(), DEFAULT_TIMESTAMP, "b");
- assertFalse(a.hasNext());
- assertFalse(b.hasNext());
- assertEquals(2, cache.size());
+ try (final WindowStoreIterator<byte[]> a =
cachingStore.fetch(bytesKey("a"), ofEpochMilli(10), ofEpochMilli(10));
+ final WindowStoreIterator<byte[]> b =
cachingStore.fetch(bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) {
+ verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a");
+ verifyKeyValue(b.next(), DEFAULT_TIMESTAMP, "b");
+ assertFalse(a.hasNext());
+ assertFalse(b.hasNext());
+ assertEquals(2, cache.size());
+ }
}
private void verifyKeyValue(final KeyValue<Long, byte[]> next,
@@ -278,18 +282,19 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
- cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10),
ofEpochMilli(10));
- verifyWindowedKeyValue(
- iterator.next(),
- new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- "a");
- verifyWindowedKeyValue(
- iterator.next(),
- new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- "b");
- assertFalse(iterator.hasNext());
- assertEquals(2, cache.size());
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.fetch(bytesKey("a"), bytesKey("b"),
ofEpochMilli(10), ofEpochMilli(10))) {
+ verifyWindowedKeyValue(
+ iterator.next(),
+ new Windowed<>(bytesKey("a"), new
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ "a");
+ verifyWindowedKeyValue(
+ iterator.next(),
+ new Windowed<>(bytesKey("b"), new
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ "b");
+ assertFalse(iterator.hasNext());
+ assertEquals(2, cache.size());
+ }
}
@Test
@@ -303,15 +308,16 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.all();
- final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
- for (final String s : array) {
- verifyWindowedKeyValue(
- iterator.next(),
- new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- s);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.all()) {
+ final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+ for (final String s : array) {
+ verifyWindowedKeyValue(
+ iterator.next(),
+ new Windowed<>(bytesKey(s), new
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ s);
+ }
+ assertFalse(iterator.hasNext());
}
- assertFalse(iterator.hasNext());
}
@Test
@@ -325,15 +331,16 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.backwardAll();
- final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
- for (final String s : array) {
- verifyWindowedKeyValue(
- iterator.next(),
- new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- s);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.backwardAll()) {
+ final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
+ for (final String s : array) {
+ verifyWindowedKeyValue(
+ iterator.next(),
+ new Windowed<>(bytesKey(s), new
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ s);
+ }
+ assertFalse(iterator.hasNext());
}
- assertFalse(iterator.hasNext());
}
@Test
@@ -343,38 +350,41 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
}
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
- cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7));
- for (int i = 0; i < array.length; i++) {
- final String str = array[i];
- verifyWindowedKeyValue(
- iterator.next(),
- new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
- str);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7))) {
+ for (int i = 0; i < array.length; i++) {
+ final String str = array[i];
+ verifyWindowedKeyValue(
+ iterator.next(),
+ new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
+ str);
+ }
+ assertFalse(iterator.hasNext());
}
- assertFalse(iterator.hasNext());
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
- cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4));
- for (int i = 2; i <= 4; i++) {
- final String str = array[i];
- verifyWindowedKeyValue(
- iterator1.next(),
- new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
- str);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
+ cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4))) {
+ for (int i = 2; i <= 4; i++) {
+ final String str = array[i];
+ verifyWindowedKeyValue(
+ iterator1.next(),
+ new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
+ str);
+ }
+ assertFalse(iterator1.hasNext());
}
- assertFalse(iterator1.hasNext());
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
- cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7));
- for (int i = 5; i <= 7; i++) {
- final String str = array[i];
- verifyWindowedKeyValue(
- iterator2.next(),
- new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
- str);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
+ cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7))) {
+ for (int i = 5; i <= 7; i++) {
+ final String str = array[i];
+ verifyWindowedKeyValue(
+ iterator2.next(),
+ new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
+ str);
+ }
+ assertFalse(iterator2.hasNext());
}
- assertFalse(iterator2.hasNext());
}
@Test
@@ -384,53 +394,57 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
}
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
- cachingStore.backwardFetchAll(ofEpochMilli(0), ofEpochMilli(7));
- for (int i = array.length - 1; i >= 0; i--) {
- final String str = array[i];
- verifyWindowedKeyValue(
- iterator.next(),
- new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
- str);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.backwardFetchAll(ofEpochMilli(0),
ofEpochMilli(7))) {
+ for (int i = array.length - 1; i >= 0; i--) {
+ final String str = array[i];
+ verifyWindowedKeyValue(
+ iterator.next(),
+ new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
+ str);
+ }
+ assertFalse(iterator.hasNext());
}
- assertFalse(iterator.hasNext());
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
- cachingStore.backwardFetchAll(ofEpochMilli(2), ofEpochMilli(4));
- for (int i = 4; i >= 2; i--) {
- final String str = array[i];
- verifyWindowedKeyValue(
- iterator1.next(),
- new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
- str);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
+ cachingStore.backwardFetchAll(ofEpochMilli(2),
ofEpochMilli(4))) {
+ for (int i = 4; i >= 2; i--) {
+ final String str = array[i];
+ verifyWindowedKeyValue(
+ iterator1.next(),
+ new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
+ str);
+ }
+ assertFalse(iterator1.hasNext());
}
- assertFalse(iterator1.hasNext());
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
- cachingStore.backwardFetchAll(ofEpochMilli(5), ofEpochMilli(7));
- for (int i = 7; i >= 5; i--) {
- final String str = array[i];
- verifyWindowedKeyValue(
- iterator2.next(),
- new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
- str);
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
+ cachingStore.backwardFetchAll(ofEpochMilli(5),
ofEpochMilli(7))) {
+ for (int i = 7; i >= 5; i--) {
+ final String str = array[i];
+ verifyWindowedKeyValue(
+ iterator2.next(),
+ new Windowed<>(bytesKey(str), new TimeWindow(i, i +
WINDOW_SIZE)),
+ str);
+ }
+ assertFalse(iterator2.hasNext());
}
- assertFalse(iterator2.hasNext());
}
@Test
public void shouldFlushEvictedItemsIntoUnderlyingStore() {
final int added = addItemsToCache();
// all dirty entries should have been flushed
- final KeyValueIterator<Bytes, byte[]> iter = bytesStore.fetch(
+ try (final KeyValueIterator<Bytes, byte[]> iter = bytesStore.fetch(
Bytes.wrap("0".getBytes(StandardCharsets.UTF_8)),
DEFAULT_TIMESTAMP,
- DEFAULT_TIMESTAMP);
- final KeyValue<Bytes, byte[]> next = iter.next();
- assertEquals(DEFAULT_TIMESTAMP, keySchema.segmentTimestamp(next.key));
- assertArrayEquals("0".getBytes(), next.value);
- assertFalse(iter.hasNext());
- assertEquals(added - 1, cache.size());
+ DEFAULT_TIMESTAMP)) {
+ final KeyValue<Bytes, byte[]> next = iter.next();
+ assertEquals(DEFAULT_TIMESTAMP,
keySchema.segmentTimestamp(next.key));
+ assertArrayEquals("0".getBytes(), next.value);
+ assertFalse(iter.hasNext());
+ assertEquals(added - 1, cache.size());
+ }
}
@Test
@@ -515,10 +529,11 @@ public class CachingPersistentWindowStoreTest {
cachingStore.flush();
cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
- final WindowStoreIterator<byte[]> fetch =
- cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP),
ofEpochMilli(DEFAULT_TIMESTAMP));
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "b");
- assertFalse(fetch.hasNext());
+ try (final WindowStoreIterator<byte[]> fetch =
+ cachingStore.fetch(bytesKey("1"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) {
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "b");
+ assertFalse(fetch.hasNext());
+ }
}
@Test
@@ -526,11 +541,12 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
- final WindowStoreIterator<byte[]> fetch =
- cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP),
ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
- assertFalse(fetch.hasNext());
+ try (final WindowStoreIterator<byte[]> fetch =
+ cachingStore.fetch(bytesKey("1"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP +
WINDOW_SIZE))) {
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+ assertFalse(fetch.hasNext());
+ }
}
@Test
@@ -538,11 +554,12 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
- final WindowStoreIterator<byte[]> fetch =
- cachingStore.backwardFetch(bytesKey("1"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
- assertFalse(fetch.hasNext());
+ try (final WindowStoreIterator<byte[]> fetch =
+ cachingStore.backwardFetch(bytesKey("1"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP +
WINDOW_SIZE))) {
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+ assertFalse(fetch.hasNext());
+ }
}
@Test
@@ -550,11 +567,12 @@ public class CachingPersistentWindowStoreTest {
final Bytes key = Bytes.wrap("1".getBytes());
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key,
DEFAULT_TIMESTAMP, 0), "a".getBytes());
cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
- final WindowStoreIterator<byte[]> fetch =
- cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP),
ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
- assertFalse(fetch.hasNext());
+ try (final WindowStoreIterator<byte[]> fetch =
+ cachingStore.fetch(bytesKey("1"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP +
WINDOW_SIZE))) {
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+ assertFalse(fetch.hasNext());
+ }
}
@Test
@@ -562,11 +580,12 @@ public class CachingPersistentWindowStoreTest {
final Bytes key = Bytes.wrap("1".getBytes());
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key,
DEFAULT_TIMESTAMP, 0), "a".getBytes());
cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
- final WindowStoreIterator<byte[]> fetch =
- cachingStore.backwardFetch(bytesKey("1"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
- verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
- assertFalse(fetch.hasNext());
+ try (final WindowStoreIterator<byte[]> fetch =
+ cachingStore.backwardFetch(bytesKey("1"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP +
WINDOW_SIZE))) {
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+ verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+ assertFalse(fetch.hasNext());
+ }
}
@Test
@@ -575,17 +594,18 @@ public class CachingPersistentWindowStoreTest {
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key,
DEFAULT_TIMESTAMP, 0), "a".getBytes());
cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
- final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
- cachingStore.fetch(key, bytesKey("2"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
- verifyWindowedKeyValue(
- fetchRange.next(),
- new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- "a");
- verifyWindowedKeyValue(
- fetchRange.next(),
- new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP +
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
- "b");
- assertFalse(fetchRange.hasNext());
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
+ cachingStore.fetch(key, bytesKey("2"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP +
WINDOW_SIZE))) {
+ verifyWindowedKeyValue(
+ fetchRange.next(),
+ new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ "a");
+ verifyWindowedKeyValue(
+ fetchRange.next(),
+ new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP +
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
+ "b");
+ assertFalse(fetchRange.hasNext());
+ }
}
@Test
@@ -594,17 +614,18 @@ public class CachingPersistentWindowStoreTest {
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key,
DEFAULT_TIMESTAMP, 0), "a".getBytes());
cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
- final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
- cachingStore.backwardFetch(key, bytesKey("2"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
- verifyWindowedKeyValue(
- fetchRange.next(),
- new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP +
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
- "b");
- verifyWindowedKeyValue(
- fetchRange.next(),
- new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- "a");
- assertFalse(fetchRange.hasNext());
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
+ cachingStore.backwardFetch(key, bytesKey("2"),
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP +
WINDOW_SIZE))) {
+ verifyWindowedKeyValue(
+ fetchRange.next(),
+ new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP +
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
+ "b");
+ verifyWindowedKeyValue(
+ fetchRange.next(),
+ new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ "a");
+ assertFalse(fetchRange.hasNext());
+ }
}
@Test
@@ -748,13 +769,14 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2);
cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3);
- final WindowStoreIterator<byte[]> singleKeyIterator =
cachingStore.fetch(bytesKey("aa"), 0L, 5L);
- final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator =
cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, 5L);
+ try (final WindowStoreIterator<byte[]> singleKeyIterator =
cachingStore.fetch(bytesKey("aa"), 0L, 5L);
+ final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator
= cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, 5L)) {
- assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
- assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
- assertFalse(singleKeyIterator.hasNext());
- assertFalse(keyRangeIterator.hasNext());
+ assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
+ assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
+ assertFalse(singleKeyIterator.hasNext());
+ assertFalse(keyRangeIterator.hasNext());
+ }
}
@Test
@@ -764,15 +786,16 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2);
cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3);
- final WindowStoreIterator<byte[]> singleKeyIterator =
- cachingStore.backwardFetch(bytesKey("aa"),
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
- final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator =
- cachingStore.backwardFetch(bytesKey("aa"), bytesKey("aa"),
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
+ try (final WindowStoreIterator<byte[]> singleKeyIterator =
+ cachingStore.backwardFetch(bytesKey("aa"),
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
+ final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator =
+ cachingStore.backwardFetch(bytesKey("aa"), bytesKey("aa"),
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L))) {
- assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
- assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
- assertFalse(singleKeyIterator.hasNext());
- assertFalse(keyRangeIterator.hasNext());
+ assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
+ assertEquals(stringFrom(singleKeyIterator.next().value),
stringFrom(keyRangeIterator.next().value));
+ assertFalse(singleKeyIterator.hasNext());
+ assertFalse(keyRangeIterator.hasNext());
+ }
}
@Test
@@ -805,8 +828,8 @@ public class CachingPersistentWindowStoreTest {
final Bytes keyFrom =
Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
final Bytes keyTo =
Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(CachingWindowStore.class)) {
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.fetch(keyFrom, keyTo, 0L, 10L);
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(CachingWindowStore.class);
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.fetch(keyFrom, keyTo, 0L, 10L)) {
assertFalse(iterator.hasNext());
final List<String> messages = appender.getMessages();
@@ -825,9 +848,9 @@ public class CachingPersistentWindowStoreTest {
final Bytes keyFrom =
Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
final Bytes keyTo =
Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(CachingWindowStore.class)) {
- final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
- cachingStore.backwardFetch(keyFrom, keyTo,
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L));
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(CachingWindowStore.class);
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.backwardFetch(keyFrom, keyTo,
Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L))) {
assertFalse(iterator.hasNext());
final List<String> messages = appender.getMessages();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 5f524fa..6e8e979 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -206,17 +206,20 @@ public class ChangeLoggingKeyValueBytesStoreTest {
public void shouldGetRecordsWithPrefixKey() {
store.put(hi, there);
store.put(Bytes.increment(hi), world);
- final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
store.prefixScan(hi.toString(), new StringSerializer());
+
final List<Bytes> keys = new ArrayList<>();
final List<Bytes> values = new ArrayList<>();
int numberOfKeysReturned = 0;
- while (keysWithPrefix.hasNext()) {
- final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
- keys.add(next.key);
- values.add(Bytes.wrap(next.value));
- numberOfKeysReturned++;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
store.prefixScan(hi.toString(), new StringSerializer())) {
+ while (keysWithPrefix.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+ keys.add(next.key);
+ values.add(Bytes.wrap(next.value));
+ numberOfKeysReturned++;
+ }
}
+
assertThat(numberOfKeysReturned, is(1));
assertThat(keys, is(Collections.singletonList(hi)));
assertThat(values, is(Collections.singletonList(Bytes.wrap(there))));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 652694b..92182c5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -148,63 +148,71 @@ public class CompositeReadOnlyKeyValueStoreTest {
@Test
public void shouldThrowNoSuchElementExceptionWhileNext() {
stubOneUnderlying.put("a", "1");
- final KeyValueIterator<String, String> keyValueIterator =
theStore.range("a", "b");
- keyValueIterator.next();
- assertThrows(NoSuchElementException.class, keyValueIterator::next);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.range("a", "b")) {
+ keyValueIterator.next();
+ assertThrows(NoSuchElementException.class, keyValueIterator::next);
+ }
}
@Test
public void shouldThrowNoSuchElementExceptionWhilePeekNext() {
stubOneUnderlying.put("a", "1");
- final KeyValueIterator<String, String> keyValueIterator =
theStore.range("a", "b");
- keyValueIterator.next();
- assertThrows(NoSuchElementException.class,
keyValueIterator::peekNextKey);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.range("a", "b")) {
+ keyValueIterator.next();
+ assertThrows(NoSuchElementException.class,
keyValueIterator::peekNextKey);
+ }
}
@Test
public void shouldThrowNoSuchElementExceptionWhileNextForPrefixScan() {
stubOneUnderlying.put("a", "1");
- final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer());
- keyValueIterator.next();
- assertThrows(NoSuchElementException.class, keyValueIterator::next);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer())) {
+ keyValueIterator.next();
+ assertThrows(NoSuchElementException.class, keyValueIterator::next);
+ }
}
@Test
public void shouldThrowNoSuchElementExceptionWhilePeekNextForPrefixScan() {
stubOneUnderlying.put("a", "1");
- final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer());
- keyValueIterator.next();
- assertThrows(NoSuchElementException.class,
keyValueIterator::peekNextKey);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer())) {
+ keyValueIterator.next();
+ assertThrows(NoSuchElementException.class,
keyValueIterator::peekNextKey);
+ }
}
@Test
public void shouldThrowUnsupportedOperationExceptionWhileRemove() {
- final KeyValueIterator<String, String> keyValueIterator =
theStore.all();
- assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.all()) {
+ assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ }
}
@Test
public void shouldThrowUnsupportedOperationExceptionWhileReverseRange() {
stubOneUnderlying.put("a", "1");
stubOneUnderlying.put("b", "1");
- final KeyValueIterator<String, String> keyValueIterator =
theStore.reverseRange("a", "b");
- assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.reverseRange("a", "b")) {
+ assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ }
}
@Test
public void shouldThrowUnsupportedOperationExceptionWhileRange() {
stubOneUnderlying.put("a", "1");
stubOneUnderlying.put("b", "1");
- final KeyValueIterator<String, String> keyValueIterator =
theStore.range("a", "b");
- assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.range("a", "b")) {
+ assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ }
}
@Test
public void shouldThrowUnsupportedOperationExceptionWhilePrefixScan() {
stubOneUnderlying.put("a", "1");
stubOneUnderlying.put("b", "1");
- final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer());
- assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ try (final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer())) {
+ assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index b3da4c6..66ae243 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -76,8 +76,9 @@ public class CompositeReadOnlySessionStoreTest {
@Test
public void shouldReturnEmptyIteratorIfNoData() {
- final KeyValueIterator<Windowed<String>, Long> result =
sessionStore.fetch("b");
- assertFalse(result.hasNext());
+ try (final KeyValueIterator<Windowed<String>, Long> result =
sessionStore.fetch("b")) {
+ assertFalse(result.hasNext());
+ }
}
@Test
@@ -104,9 +105,10 @@ public class CompositeReadOnlySessionStoreTest {
otherUnderlyingStore.put(new Windowed<>("foo", new SessionWindow(10,
10)), 10L);
underlyingSessionStore.put(expectedKey, 1L);
- final KeyValueIterator<Windowed<String>, Long> result =
sessionStore.fetch("foo");
- assertEquals(KeyValue.pair(expectedKey, 1L), result.next());
- assertFalse(result.hasNext());
+ try (final KeyValueIterator<Windowed<String>, Long> result =
sessionStore.fetch("foo")) {
+ assertEquals(KeyValue.pair(expectedKey, 1L), result.next());
+ assertFalse(result.hasNext());
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 7bb7268..3c486c3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -80,13 +80,14 @@ public class CompositeReadOnlyWindowStoreTest {
underlyingWindowStore.put("my-key", "my-value", 0L);
underlyingWindowStore.put("my-key", "my-later-value", 10L);
- final WindowStoreIterator<String> iterator =
- windowStore.fetch("my-key", ofEpochMilli(0L), ofEpochMilli(25L));
- final List<KeyValue<Long, String>> results =
StreamsTestUtils.toList(iterator);
+ try (final WindowStoreIterator<String> iterator =
+ windowStore.fetch("my-key", ofEpochMilli(0L),
ofEpochMilli(25L))) {
+ final List<KeyValue<Long, String>> results =
StreamsTestUtils.toList(iterator);
- assertEquals(
- asList(new KeyValue<>(0L, "my-value"), new KeyValue<>(10L,
"my-later-value")),
- results);
+ assertEquals(
+ asList(new KeyValue<>(0L, "my-value"), new KeyValue<>(10L,
"my-later-value")),
+ results);
+ }
}
@Test
@@ -94,27 +95,30 @@ public class CompositeReadOnlyWindowStoreTest {
underlyingWindowStore.put("my-key", "my-value", 0L);
underlyingWindowStore.put("my-key", "my-later-value", 10L);
- final WindowStoreIterator<String> iterator =
- windowStore.backwardFetch("my-key", ofEpochMilli(0L),
ofEpochMilli(25L));
- final List<KeyValue<Long, String>> results =
StreamsTestUtils.toList(iterator);
+ try (final WindowStoreIterator<String> iterator =
+ windowStore.backwardFetch("my-key", ofEpochMilli(0L),
ofEpochMilli(25L))) {
+ final List<KeyValue<Long, String>> results =
StreamsTestUtils.toList(iterator);
- assertEquals(
- asList(new KeyValue<>(10L, "my-later-value"), new KeyValue<>(0L,
"my-value")),
- results);
+ assertEquals(
+ asList(new KeyValue<>(10L, "my-later-value"), new
KeyValue<>(0L, "my-value")),
+ results);
+ }
}
@Test
public void shouldReturnEmptyIteratorIfNoData() {
- final WindowStoreIterator<String> iterator =
- windowStore.fetch("my-key", ofEpochMilli(0L), ofEpochMilli(25L));
- assertFalse(iterator.hasNext());
+ try (final WindowStoreIterator<String> iterator =
+ windowStore.fetch("my-key", ofEpochMilli(0L),
ofEpochMilli(25L))) {
+ assertFalse(iterator.hasNext());
+ }
}
@Test
public void shouldReturnBackwardEmptyIteratorIfNoData() {
- final WindowStoreIterator<String> iterator =
- windowStore.backwardFetch("my-key", ofEpochMilli(0L),
ofEpochMilli(25L));
- assertFalse(iterator.hasNext());
+ try (final WindowStoreIterator<String> iterator =
+ windowStore.backwardFetch("my-key", ofEpochMilli(0L),
ofEpochMilli(25L))) {
+ assertFalse(iterator.hasNext());
+ }
}
@Test
@@ -251,10 +255,11 @@ public class CompositeReadOnlyWindowStoreTest {
QueryableStoreTypes.windowStore(),
"foo"
);
- final WindowStoreIterator<Object> windowStoreIterator =
- store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10));
+ try (final WindowStoreIterator<Object> windowStoreIterator =
+ store.backwardFetch("key", ofEpochMilli(1),
ofEpochMilli(10))) {
- Assert.assertFalse(windowStoreIterator.hasNext());
+ Assert.assertFalse(windowStoreIterator.hasNext());
+ }
}
@Test
@@ -268,10 +273,11 @@ public class CompositeReadOnlyWindowStoreTest {
QueryableStoreTypes.windowStore(),
"foo"
);
- final WindowStoreIterator<Object> windowStoreIterator =
- store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
+ try (final WindowStoreIterator<Object> windowStoreIterator =
+ store.fetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
- Assert.assertFalse(windowStoreIterator.hasNext());
+ Assert.assertFalse(windowStoreIterator.hasNext());
+ }
}
@Test
@@ -285,8 +291,9 @@ public class CompositeReadOnlyWindowStoreTest {
QueryableStoreTypes.windowStore(),
"foo"
);
- final WindowStoreIterator<Object> windowStoreIterator =
store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10));
- assertThrows(NoSuchElementException.class,
windowStoreIterator::peekNextKey);
+ try (final WindowStoreIterator<Object> windowStoreIterator =
store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
+ assertThrows(NoSuchElementException.class,
windowStoreIterator::peekNextKey);
+ }
}
@@ -301,9 +308,10 @@ public class CompositeReadOnlyWindowStoreTest {
QueryableStoreTypes.windowStore(),
"foo"
);
- final WindowStoreIterator<Object> windowStoreIterator =
- store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
- assertThrows(NoSuchElementException.class,
windowStoreIterator::peekNextKey);
+ try (final WindowStoreIterator<Object> windowStoreIterator =
+ store.fetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
+ assertThrows(NoSuchElementException.class,
windowStoreIterator::peekNextKey);
+ }
}
@Test
@@ -317,9 +325,10 @@ public class CompositeReadOnlyWindowStoreTest {
QueryableStoreTypes.windowStore(),
"foo"
);
- final WindowStoreIterator<Object> windowStoreIterator =
- store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
- assertThrows(NoSuchElementException.class, windowStoreIterator::next);
+ try (final WindowStoreIterator<Object> windowStoreIterator =
+ store.fetch("key", ofEpochMilli(1), ofEpochMilli(10))) {
+ assertThrows(NoSuchElementException.class,
windowStoreIterator::next);
+ }
}
@Test
@@ -333,9 +342,10 @@ public class CompositeReadOnlyWindowStoreTest {
QueryableStoreTypes.windowStore(),
"foo"
);
- final WindowStoreIterator<Object> windowStoreIterator =
- store.backwardFetch("key", ofEpochMilli(1), ofEpochMilli(10));
- assertThrows(NoSuchElementException.class, windowStoreIterator::next);
+ try (final WindowStoreIterator<Object> windowStoreIterator =
+ store.backwardFetch("key", ofEpochMilli(1),
ofEpochMilli(10))) {
+ assertThrows(NoSuchElementException.class,
windowStoreIterator::next);
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 87a2063..d67d665 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -127,15 +127,17 @@ public class InMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest {
byteStore.putAll(entries);
byteStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
byteStore.prefixScan("prefix", stringSerializer);
final List<String> valuesWithPrefix = new ArrayList<>();
int numberOfKeysReturned = 0;
- while (keysWithPrefix.hasNext()) {
- final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
- valuesWithPrefix.add(new String(next.value));
- numberOfKeysReturned++;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
byteStore.prefixScan("prefix", stringSerializer)) {
+ while (keysWithPrefix.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+ valuesWithPrefix.add(new String(next.value));
+ numberOfKeysReturned++;
+ }
}
+
assertThat(numberOfKeysReturned, is(3));
assertThat(valuesWithPrefix.get(0), is("f"));
assertThat(valuesWithPrefix.get(1), is("d"));
@@ -160,15 +162,16 @@ public class InMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest {
byteStore.putAll(entries);
byteStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd =
byteStore.prefixScan("abcd", stringSerializer);
- int numberOfKeysReturned = 0;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd =
byteStore.prefixScan("abcd", stringSerializer)) {
+ int numberOfKeysReturned = 0;
- while (keysWithPrefixAsabcd.hasNext()) {
- keysWithPrefixAsabcd.next().key.get();
- numberOfKeysReturned++;
- }
+ while (keysWithPrefixAsabcd.hasNext()) {
+ keysWithPrefixAsabcd.next().key.get();
+ numberOfKeysReturned++;
+ }
- assertThat(numberOfKeysReturned, is(1));
+ assertThat(numberOfKeysReturned, is(1));
+ }
}
@Test
@@ -188,14 +191,15 @@ public class InMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest {
byteStore.putAll(entries);
byteStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
byteStore.prefixScan(prefix, stringSerializer);
final List<String> valuesWithPrefix = new ArrayList<>();
int numberOfKeysReturned = 0;
- while (keysWithPrefix.hasNext()) {
- final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
- valuesWithPrefix.add(new String(next.value));
- numberOfKeysReturned++;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
byteStore.prefixScan(prefix, stringSerializer)) {
+ while (keysWithPrefix.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+ valuesWithPrefix.add(new String(next.value));
+ numberOfKeysReturned++;
+ }
}
assertThat(numberOfKeysReturned, is(1));
@@ -217,13 +221,15 @@ public class InMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest {
byteStore.putAll(entries);
byteStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
byteStore.prefixScan("bb", stringSerializer);
int numberOfKeysReturned = 0;
- while (keysWithPrefix.hasNext()) {
- keysWithPrefix.next();
- numberOfKeysReturned++;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
byteStore.prefixScan("bb", stringSerializer)) {
+ while (keysWithPrefix.hasNext()) {
+ keysWithPrefix.next();
+ numberOfKeysReturned++;
+ }
}
+
assertThat(numberOfKeysReturned, is(0));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index b18e8a7..5150839 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -75,13 +75,14 @@ public class InMemoryWindowStoreTest extends
AbstractWindowBytesStoreTest {
serdes.rawValue("three")));
context.restore(STORE_NAME, restorableEntries);
- final KeyValueIterator<Windowed<Integer>, String> iterator =
windowStore
- .fetchAll(0L, 2 * WINDOW_SIZE);
-
- assertEquals(windowedPair(1, "one", 0L), iterator.next());
- assertEquals(windowedPair(2, "two", WINDOW_SIZE), iterator.next());
- assertEquals(windowedPair(3, "three", 2 * WINDOW_SIZE),
iterator.next());
- assertFalse(iterator.hasNext());
+ try (final KeyValueIterator<Windowed<Integer>, String> iterator =
windowStore
+ .fetchAll(0L, 2 * WINDOW_SIZE)) {
+
+ assertEquals(windowedPair(1, "one", 0L), iterator.next());
+ assertEquals(windowedPair(2, "two", WINDOW_SIZE), iterator.next());
+ assertEquals(windowedPair(3, "three", 2 * WINDOW_SIZE),
iterator.next());
+ assertFalse(iterator.hasNext());
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
index b8075d4..1716ac1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
@@ -99,9 +99,10 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest
{
final byte[][] bytes = {{0}, {1}};
store.put(Bytes.wrap(bytes[0]), bytes[0]);
cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
- final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator();
- assertArrayEquals(bytes[0], iterator.next().key.get());
- assertFalse(iterator.hasNext());
+ try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator()) {
+ assertArrayEquals(bytes[0], iterator.next().key.get());
+ assertFalse(iterator.hasNext());
+ }
}
@Test
@@ -109,9 +110,10 @@ public class
MergedSortedCacheKeyValueBytesStoreIteratorTest {
final byte[][] bytes = {{0}, {1}};
cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
store.put(Bytes.wrap(bytes[1]), bytes[1]);
- final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator();
- assertArrayEquals(bytes[1], iterator.next().key.get());
- assertFalse(iterator.hasNext());
+ try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator()) {
+ assertArrayEquals(bytes[1], iterator.next().key.get());
+ assertFalse(iterator.hasNext());
+ }
}
@Test
@@ -119,8 +121,9 @@ public class
MergedSortedCacheKeyValueBytesStoreIteratorTest {
final byte[][] bytes = {{0}};
cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
store.put(Bytes.wrap(bytes[0]), bytes[0]);
- final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator();
- assertFalse(iterator.hasNext());
+ try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator()) {
+ assertFalse(iterator.hasNext());
+ }
}
@Test
@@ -157,16 +160,16 @@ public class
MergedSortedCacheKeyValueBytesStoreIteratorTest {
cache.put(namespace, Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
- final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator();
- assertArrayEquals(bytes[0], iterator.next().key.get());
- assertArrayEquals(bytes[4], iterator.next().key.get());
- assertArrayEquals(bytes[5], iterator.next().key.get());
- assertArrayEquals(bytes[6], iterator.next().key.get());
- assertArrayEquals(bytes[7], iterator.next().key.get());
- assertArrayEquals(bytes[9], iterator.next().key.get());
- assertArrayEquals(bytes[10], iterator.next().key.get());
- assertFalse(iterator.hasNext());
-
+ try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator()) {
+ assertArrayEquals(bytes[0], iterator.next().key.get());
+ assertArrayEquals(bytes[4], iterator.next().key.get());
+ assertArrayEquals(bytes[5], iterator.next().key.get());
+ assertArrayEquals(bytes[6], iterator.next().key.get());
+ assertArrayEquals(bytes[7], iterator.next().key.get());
+ assertArrayEquals(bytes[9], iterator.next().key.get());
+ assertArrayEquals(bytes[10], iterator.next().key.get());
+ assertFalse(iterator.hasNext());
+ }
}
@Test