This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f9aeebe KAFKA-13123: close KeyValueIterator instances in example code
and tests (#11105)
f9aeebe is described below
commit f9aeebed054000934af7555f39085f5caecf34bc
Author: Luke Chen <[email protected]>
AuthorDate: Tue Jul 27 07:23:04 2021 +0800
KAFKA-13123: close KeyValueIterator instances in example code and tests
(#11105)
Reviewers: Matthias J. Sax <[email protected]>
---
.../examples/docs/DeveloperGuideTesting.java | 9 +-
.../streams/integration/EosIntegrationTest.java | 11 +-
.../KStreamAggregationIntegrationTest.java | 9 +-
.../integration/QueryableStateIntegrationTest.java | 26 +-
...KStreamSessionWindowAggregateProcessorTest.java | 46 +--
.../processor/internals/ProcessorTopologyTest.java | 351 +++++++--------------
.../streams/state/internals/RocksDBStoreTest.java | 209 ++++++------
.../state/internals/RocksDBWindowStoreTest.java | 17 +-
.../state/internals/SessionKeySchemaTest.java | 8 +
.../state/internals/WindowKeySchemaTest.java | 14 +-
10 files changed, 291 insertions(+), 409 deletions(-)
diff --git
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 7b54486..41b61e3 100644
---
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -173,10 +173,11 @@ public class DeveloperGuideTesting {
}
private void flushStore(final long timestamp) {
- final KeyValueIterator<String, Long> it = store.all();
- while (it.hasNext()) {
- final KeyValue<String, Long> next = it.next();
- context.forward(new Record<>(next.key, next.value, timestamp));
+ try (final KeyValueIterator<String, Long> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String, Long> next = it.next();
+ context.forward(new Record<>(next.key, next.value,
timestamp));
+ }
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 343a105..0d5c187 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -993,11 +993,12 @@ public class EosIntegrationTest {
.getStore(300_000L, storeName, streams,
QueryableStoreTypes.keyValueStore());
assertNotNull(store);
- final KeyValueIterator<Long, Long> it = store.all();
- while (it.hasNext()) {
- assertTrue(reason, expectedStoreContent.remove(it.next()));
- }
+ try (final KeyValueIterator<Long, Long> it = store.all()) {
+ while (it.hasNext()) {
+ assertTrue(reason, expectedStoreContent.remove(it.next()));
+ }
- assertTrue(reason, expectedStoreContent.isEmpty());
+ assertTrue(reason, expectedStoreContent.isEmpty());
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 1b92ab5..e581903 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -923,10 +923,11 @@ public class KStreamAggregationIntegrationTest {
final ReadOnlySessionStore<String, String> sessionStore =
IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams,
QueryableStoreTypes.sessionStore());
- final KeyValueIterator<Windowed<String>, String> bob =
sessionStore.fetch("bob");
- assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new
SessionWindow(t1, t1)), "start")));
- assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new
SessionWindow(t3, t4)), "pause:resume")));
- assertFalse(bob.hasNext());
+ try (final KeyValueIterator<Windowed<String>, String> bob =
sessionStore.fetch("bob")) {
+ assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob",
new SessionWindow(t1, t1)), "start")));
+ assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob",
new SessionWindow(t3, t4)), "pause:resume")));
+ assertFalse(bob.hasNext());
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index af954e64..15b9ea6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -846,9 +846,10 @@ public class QueryableStateIntegrationTest {
assertEquals(Long.valueOf(batchEntry.value),
myMapStore.get(batchEntry.key));
}
- final KeyValueIterator<String, Long> range = myMapStore.range("hello",
"kafka");
- while (range.hasNext()) {
- System.out.println(range.next());
+ try (final KeyValueIterator<String, Long> range =
myMapStore.range("hello", "kafka")) {
+ while (range.hasNext()) {
+ System.out.println(range.next());
+ }
}
}
@@ -899,9 +900,10 @@ public class QueryableStateIntegrationTest {
IntegrationTestUtils.getStore("queryMapValues", kafkaStreams,
keyValueStore());
int index = 0;
- final KeyValueIterator<String, Long> range =
myMapStore.prefixScan("go", Serdes.String().serializer());
- while (range.hasNext()) {
- assertEquals(expectedPrefixScanResult.get(index++), range.next());
+ try (final KeyValueIterator<String, Long> range =
myMapStore.prefixScan("go", Serdes.String().serializer())) {
+ while (range.hasNext()) {
+ assertEquals(expectedPrefixScanResult.get(index++),
range.next());
+ }
}
}
@@ -1234,12 +1236,14 @@ public class QueryableStateIntegrationTest {
private Set<KeyValue<String, Long>> fetch(final
ReadOnlyWindowStore<String, Long> store,
final String key) {
- final WindowStoreIterator<Long> fetch =
- store.fetch(key, ofEpochMilli(0),
ofEpochMilli(System.currentTimeMillis()));
- if (fetch.hasNext()) {
- final KeyValue<Long, Long> next = fetch.next();
- return Collections.singleton(KeyValue.pair(key, next.value));
+ try (final WindowStoreIterator<Long> fetch =
+ store.fetch(key, ofEpochMilli(0),
ofEpochMilli(System.currentTimeMillis()))) {
+ if (fetch.hasNext()) {
+ final KeyValue<Long, Long> next = fetch.next();
+ return Collections.singleton(KeyValue.pair(key, next.value));
+ }
}
+
return Collections.emptySet();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index f4ebfdd..a897d11 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -157,10 +157,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setTime(500);
processor.process("john", "second");
- final KeyValueIterator<Windowed<String>, Long> values =
- sessionStore.findSessions("john", 0, 2000);
- assertTrue(values.hasNext());
- assertEquals(Long.valueOf(2), values.next().value);
+ try (final KeyValueIterator<Windowed<String>, Long> values =
+ sessionStore.findSessions("john", 0, 2000)) {
+ assertTrue(values.hasNext());
+ assertEquals(Long.valueOf(2), values.next().value);
+ }
}
@Test
@@ -180,12 +181,13 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setTime(GAP_MS / 2);
processor.process(sessionId, "third");
- final KeyValueIterator<Windowed<String>, Long> iterator =
- sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
- final KeyValue<Windowed<String>, Long> kv = iterator.next();
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions(sessionId, 0, GAP_MS + 1)) {
+ final KeyValue<Windowed<String>, Long> kv = iterator.next();
- assertEquals(Long.valueOf(3), kv.value);
- assertFalse(iterator.hasNext());
+ assertEquals(Long.valueOf(3), kv.value);
+ assertFalse(iterator.hasNext());
+ }
}
@Test
@@ -193,10 +195,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setTime(0);
processor.process("mel", "first");
processor.process("mel", "second");
- final KeyValueIterator<Windowed<String>, Long> iterator =
- sessionStore.findSessions("mel", 0, 0);
- assertEquals(Long.valueOf(2L), iterator.next().value);
- assertFalse(iterator.hasNext());
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("mel", 0, 0)) {
+ assertEquals(Long.valueOf(2L), iterator.next().value);
+ assertFalse(iterator.hasNext());
+ }
}
@Test
@@ -240,18 +243,21 @@ public class KStreamSessionWindowAggregateProcessorTest {
processor.process("a", "1");
// first ensure it is in the store
- final KeyValueIterator<Windowed<String>, Long> a1 =
- sessionStore.findSessions("a", 0, 0);
- assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0,
0)), 1L), a1.next());
+ try (final KeyValueIterator<Windowed<String>, Long> a1 =
+ sessionStore.findSessions("a", 0, 0)) {
+ assertEquals(KeyValue.pair(new Windowed<>("a", new
SessionWindow(0, 0)), 1L), a1.next());
+ }
+
context.setTime(100);
processor.process("a", "2");
// a1 from above should have been removed
// should have merged session in store
- final KeyValueIterator<Windowed<String>, Long> a2 =
- sessionStore.findSessions("a", 0, 100);
- assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0,
100)), 2L), a2.next());
- assertFalse(a2.hasNext());
+ try (final KeyValueIterator<Windowed<String>, Long> a2 =
+ sessionStore.findSessions("a", 0, 100)) {
+ assertEquals(KeyValue.pair(new Windowed<>("a", new
SessionWindow(0, 100)), 2L), a2.next());
+ assertFalse(a2.hasNext());
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a0dc50e..de26c7b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -84,6 +84,9 @@ public class ProcessorTopologyTest {
private static final Serializer<String> STRING_SERIALIZER = new
StringSerializer();
private static final Deserializer<String> STRING_DESERIALIZER = new
StringDeserializer();
+ private static final String DEFAULT_STORE_NAME = "prefixScanStore";
+ private static final String DEFAULT_PREFIX = "key";
+
private static final String INPUT_TOPIC_1 = "input-topic-1";
private static final String INPUT_TOPIC_2 = "input-topic-2";
private static final String OUTPUT_TOPIC_1 = "output-topic-1";
@@ -118,6 +121,18 @@ public class ProcessorTopologyTest {
driver = null;
}
+ private List<KeyValue<String, String>> prefixScanResults(final
KeyValueStore<String, String> store, final String prefix) {
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ try (final KeyValueIterator<String, String> prefixScan =
store.prefixScan(prefix, Serdes.String().serializer())) {
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+ }
+
+ return results;
+ }
+
@Test
public void testTopologyMetadata() {
topology.addSource("source-1", "topic-1");
@@ -389,14 +404,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -411,13 +425,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -430,14 +439,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -452,13 +460,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -471,14 +474,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -493,13 +495,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -512,14 +509,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanPersistentStoreNoCachingNoLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -534,13 +530,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -553,14 +544,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanPersistentStoreWithCachingNoLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -575,13 +565,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -594,14 +579,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanPersistentStoreWithCachingWithLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -616,13 +600,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -635,14 +614,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -657,13 +635,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -676,14 +649,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanPersistentTimestampedStoreWithCachingNoLogging()
{
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -698,13 +670,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -717,14 +684,13 @@ public class ProcessorTopologyTest {
@Test
public void
testPrefixScanPersistentTimestampedStoreWithCachingWithLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -739,13 +705,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -758,14 +719,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanLruMapNoCachingNoLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
- Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME,
100), Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -780,13 +740,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -799,14 +754,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanLruMapWithCachingNoLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
- Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME,
100), Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -821,13 +775,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -840,14 +789,13 @@ public class ProcessorTopologyTest {
@Test
public void testPrefixScanLruMapWithCachingWithLogging() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
- Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME,
100), Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)),
"source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -862,13 +810,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -882,14 +825,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void testPrefixScanInMemoryStoreNoCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -904,13 +846,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -924,14 +861,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void testPrefixScanInMemoryStoreWithCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -946,13 +882,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -966,14 +897,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void
testPrefixScanInMemoryStoreWithCachingWithLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -988,13 +918,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1008,14 +933,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void testPrefixScanPersistentStoreNoCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1030,13 +954,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1050,14 +969,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void
testPrefixScanPersistentStoreWithCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1072,13 +990,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1092,14 +1005,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void
testPrefixScanPersistentStoreWithCachingWithLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1114,13 +1026,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1134,14 +1041,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void
testPrefixScanPersistentTimestampedStoreNoCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1156,13 +1062,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1176,14 +1077,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void
testPrefixScanPersistentTimestampedStoreWithCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1198,13 +1098,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1218,14 +1113,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void
testPrefixScanPersistentTimestampedStoreWithCachingWithLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1240,13 +1134,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1260,14 +1149,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void testPrefixScanLruMapNoCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
- Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME,
100), Serdes.String(), Serdes.String())
.withCachingDisabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1282,13 +1170,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1302,14 +1185,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void testPrefixScanLruMapWithCachingNoLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
- Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME,
100), Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1324,13 +1206,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
@@ -1344,14 +1221,13 @@ public class ProcessorTopologyTest {
@Deprecated // testing old PAPI
@Test
public void testPrefixScanLruMapWithCachingWithLoggingOldProcessor() {
- final String storeName = "prefixScanStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
- Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME,
100), Serdes.String(), Serdes.String())
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
- .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(DEFAULT_STORE_NAME),
Collections.singleton(storeBuilder)), "source1")
.addSink("counts", OUTPUT_TOPIC_1, "processor1");
driver = new TopologyTestDriver(topology, props);
@@ -1366,13 +1242,8 @@ public class ProcessorTopologyTest {
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
- final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
- final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
- final List<KeyValue<String, String>> results = new ArrayList<>();
- while (prefixScan.hasNext()) {
- final KeyValue<String, String> next = prefixScan.next();
- results.add(next);
- }
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
assertEquals("key1", results.get(0).key);
assertEquals("value4", results.get(0).value);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index e8a05ff..573cbe6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -52,6 +52,7 @@ import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
@@ -89,7 +90,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify;
@@ -407,19 +407,20 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
rocksDBStore.putAll(entries);
rocksDBStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
rocksDBStore.prefixScan("prefix", stringSerializer);
- final List<String> valuesWithPrefix = new ArrayList<>();
- int numberOfKeysReturned = 0;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
rocksDBStore.prefixScan("prefix", stringSerializer)) {
+ final List<String> valuesWithPrefix = new ArrayList<>();
+ int numberOfKeysReturned = 0;
- while (keysWithPrefix.hasNext()) {
- final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
- valuesWithPrefix.add(new String(next.value));
- numberOfKeysReturned++;
+ while (keysWithPrefix.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+ valuesWithPrefix.add(new String(next.value));
+ numberOfKeysReturned++;
+ }
+ assertThat(numberOfKeysReturned, is(3));
+ assertThat(valuesWithPrefix.get(0), is("f"));
+ assertThat(valuesWithPrefix.get(1), is("d"));
+ assertThat(valuesWithPrefix.get(2), is("b"));
}
- assertThat(numberOfKeysReturned, is(3));
- assertThat(valuesWithPrefix.get(0), is("f"));
- assertThat(valuesWithPrefix.get(1), is("d"));
- assertThat(valuesWithPrefix.get(2), is("b"));
}
@Test
@@ -441,15 +442,16 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
rocksDBStore.putAll(entries);
rocksDBStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd =
rocksDBStore.prefixScan("abcd", stringSerializer);
- int numberOfKeysReturned = 0;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd =
rocksDBStore.prefixScan("abcd", stringSerializer)) {
+ int numberOfKeysReturned = 0;
- while (keysWithPrefixAsabcd.hasNext()) {
- keysWithPrefixAsabcd.next().key.get();
- numberOfKeysReturned++;
- }
+ while (keysWithPrefixAsabcd.hasNext()) {
+ keysWithPrefixAsabcd.next().key.get();
+ numberOfKeysReturned++;
+ }
- assertThat(numberOfKeysReturned, is(1));
+ assertThat(numberOfKeysReturned, is(1));
+ }
}
@Test
@@ -472,21 +474,22 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
rocksDBStore.putAll(entries);
rocksDBStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
rocksDBStore.prefixScan(prefix, stringSerializer);
- final List<String> valuesWithPrefix = new ArrayList<>();
- int numberOfKeysReturned = 0;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
rocksDBStore.prefixScan(prefix, stringSerializer)) {
+ final List<String> valuesWithPrefix = new ArrayList<>();
+ int numberOfKeysReturned = 0;
- while (keysWithPrefix.hasNext()) {
- final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
- valuesWithPrefix.add(new String(next.value));
- numberOfKeysReturned++;
- }
+ while (keysWithPrefix.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+ valuesWithPrefix.add(new String(next.value));
+ numberOfKeysReturned++;
+ }
- assertThat(numberOfKeysReturned, is(numMatches));
- if (numMatches == 2) {
- assertThat(valuesWithPrefix.get(0), either(is("a")).or(is("b")));
- } else {
- assertThat(valuesWithPrefix.get(0), is("a"));
+ assertThat(numberOfKeysReturned, is(numMatches));
+ if (numMatches == 2) {
+ assertThat(valuesWithPrefix.get(0),
either(is("a")).or(is("b")));
+ } else {
+ assertThat(valuesWithPrefix.get(0), is("a"));
+ }
}
}
@@ -506,14 +509,15 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
rocksDBStore.putAll(entries);
rocksDBStore.flush();
- final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
rocksDBStore.prefixScan("d", stringSerializer);
- int numberOfKeysReturned = 0;
+ try (final KeyValueIterator<Bytes, byte[]> keysWithPrefix =
rocksDBStore.prefixScan("d", stringSerializer)) {
+ int numberOfKeysReturned = 0;
- while (keysWithPrefix.hasNext()) {
- keysWithPrefix.next();
- numberOfKeysReturned++;
+ while (keysWithPrefix.hasNext()) {
+ keysWithPrefix.next();
+ numberOfKeysReturned++;
+ }
+ assertThat(numberOfKeysReturned, is(0));
}
- assertThat(numberOfKeysReturned, is(0));
}
@Test
@@ -562,14 +566,15 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
- final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
- final Set<String> keys = new HashSet<>();
+ try (final KeyValueIterator<Bytes, byte[]> iterator =
rocksDBStore.all()) {
+ final Set<String> keys = new HashSet<>();
- while (iterator.hasNext()) {
- keys.add(stringDeserializer.deserialize(null,
iterator.next().key.get()));
- }
+ while (iterator.hasNext()) {
+ keys.add(stringDeserializer.deserialize(null,
iterator.next().key.get()));
+ }
- assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+ assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+ }
}
@Test
@@ -586,30 +591,31 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
- final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
- final Set<String> keys = new HashSet<>();
+ try (final KeyValueIterator<Bytes, byte[]> iterator =
rocksDBStore.all()) {
+ final Set<String> keys = new HashSet<>();
- while (iterator.hasNext()) {
- keys.add(stringDeserializer.deserialize(null,
iterator.next().key.get()));
- }
-
- assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
+ while (iterator.hasNext()) {
+ keys.add(stringDeserializer.deserialize(null,
iterator.next().key.get()));
+ }
- assertEquals(
- "restored",
- stringDeserializer.deserialize(
- null,
- rocksDBStore.get(new Bytes(stringSerializer.serialize(null,
"1")))));
- assertEquals(
- "b",
- stringDeserializer.deserialize(
- null,
- rocksDBStore.get(new Bytes(stringSerializer.serialize(null,
"2")))));
- assertEquals(
- "c",
- stringDeserializer.deserialize(
- null,
- rocksDBStore.get(new Bytes(stringSerializer.serialize(null,
"3")))));
+ assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
+
+ assertEquals(
+ "restored",
+ stringDeserializer.deserialize(
+ null,
+ rocksDBStore.get(new
Bytes(stringSerializer.serialize(null, "1")))));
+ assertEquals(
+ "b",
+ stringDeserializer.deserialize(
+ null,
+ rocksDBStore.get(new
Bytes(stringSerializer.serialize(null, "2")))));
+ assertEquals(
+ "c",
+ stringDeserializer.deserialize(
+ null,
+ rocksDBStore.get(new
Bytes(stringSerializer.serialize(null, "3")))));
+ }
}
@Test
@@ -644,14 +650,15 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
context.restore(rocksDBStore.name(), entries);
- final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
- final Set<String> keys = new HashSet<>();
+ try (final KeyValueIterator<Bytes, byte[]> iterator =
rocksDBStore.all()) {
+ final Set<String> keys = new HashSet<>();
- while (iterator.hasNext()) {
- keys.add(stringDeserializer.deserialize(null,
iterator.next().key.get()));
- }
+ while (iterator.hasNext()) {
+ keys.add(stringDeserializer.deserialize(null,
iterator.next().key.get()));
+ }
- assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+ assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+ }
}
@Test
@@ -872,10 +879,11 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
- final KeyValueIterator<Integer, String> range = store.range(1, 2);
- assertEquals("hi", range.next().value);
- assertEquals("goodbye", range.next().value);
- assertFalse(range.hasNext());
+ try (final KeyValueIterator<Integer, String> range = store.range(1,
2)) {
+ assertEquals("hi", range.next().value);
+ assertEquals("goodbye", range.next().value);
+ assertFalse(range.hasNext());
+ }
}
@Test
@@ -883,10 +891,11 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
- final KeyValueIterator<Integer, String> range = store.all();
- assertEquals("hi", range.next().value);
- assertEquals("goodbye", range.next().value);
- assertFalse(range.hasNext());
+ try (final KeyValueIterator<Integer, String> range = store.all()) {
+ assertEquals("hi", range.next().value);
+ assertEquals("goodbye", range.next().value);
+ assertFalse(range.hasNext());
+ }
}
@Test
@@ -894,40 +903,18 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
- final KeyValueIterator<Integer, String> iteratorOne = store.range(1,
5);
- final KeyValueIterator<Integer, String> iteratorTwo = store.range(1,
4);
-
- assertTrue(iteratorOne.hasNext());
- assertTrue(iteratorTwo.hasNext());
+ try (final KeyValueIterator<Integer, String> iteratorOne =
store.range(1, 5);
+ final KeyValueIterator<Integer, String> iteratorTwo =
store.range(1, 4)) {
- store.close();
+ assertTrue(iteratorOne.hasNext());
+ assertTrue(iteratorTwo.hasNext());
- try {
- iteratorOne.hasNext();
- fail("should have thrown InvalidStateStoreException on closed
store");
- } catch (final InvalidStateStoreException e) {
- // ok
- }
+ store.close();
- try {
- iteratorOne.next();
- fail("should have thrown InvalidStateStoreException on closed
store");
- } catch (final InvalidStateStoreException e) {
- // ok
- }
-
- try {
- iteratorTwo.hasNext();
- fail("should have thrown InvalidStateStoreException on closed
store");
- } catch (final InvalidStateStoreException e) {
- // ok
- }
-
- try {
- iteratorTwo.next();
- fail("should have thrown InvalidStateStoreException on closed
store");
- } catch (final InvalidStateStoreException e) {
- // ok
+ Assertions.assertThrows(InvalidStateStoreException.class, () ->
iteratorOne.hasNext());
+ Assertions.assertThrows(InvalidStateStoreException.class, () ->
iteratorOne.next());
+ Assertions.assertThrows(InvalidStateStoreException.class, () ->
iteratorTwo.hasNext());
+ Assertions.assertThrows(InvalidStateStoreException.class, () ->
iteratorTwo.next());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 3bb9f14..7da5245 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -79,16 +79,17 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
windowStore.put(1, "three", currentTime);
- final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L,
currentTime);
+ try (final WindowStoreIterator<String> iterator = windowStore.fetch(1,
0L, currentTime)) {
- // roll to the next segment that will close the first
- currentTime = currentTime + SEGMENT_INTERVAL;
- windowStore.put(1, "four", currentTime);
+ // roll to the next segment that will close the first
+ currentTime = currentTime + SEGMENT_INTERVAL;
+ windowStore.put(1, "four", currentTime);
- // should only have 2 values as the first segment is no longer open
- assertEquals(new KeyValue<>(SEGMENT_INTERVAL, "two"), iterator.next());
- assertEquals(new KeyValue<>(2 * SEGMENT_INTERVAL, "three"),
iterator.next());
- assertFalse(iterator.hasNext());
+ // should only have 2 values as the first segment is no longer open
+ assertEquals(new KeyValue<>(SEGMENT_INTERVAL, "two"),
iterator.next());
+ assertEquals(new KeyValue<>(2 * SEGMENT_INTERVAL, "three"),
iterator.next());
+ assertFalse(iterator.hasNext());
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index cd34fb0..40b06c0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -54,6 +55,13 @@ public class SessionKeySchemaTest {
private final SessionKeySchema sessionKeySchema = new SessionKeySchema();
private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
+ @After
+ public void after() {
+ if (iterator != null) {
+ iterator.close();
+ }
+ }
+
@Before
public void before() {
final List<KeyValue<Bytes, Integer>> keys =
Arrays.asList(KeyValue.pair(SessionKeySchema.toBinary(new
Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index bc17abf..8f0ca83 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -63,14 +63,16 @@ public class WindowKeySchemaTest {
KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new
Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(10, 20)), 4), 4),
KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new
Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(10, 20)), 5), 5),
KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new
Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(10, 20)), 6), 6));
- final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator = new
DelegatingPeekingKeyValueIterator<>("foo", new
KeyValueIteratorStub<>(keys.iterator()));
+ try (final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator
= new DelegatingPeekingKeyValueIterator<>("foo", new
KeyValueIteratorStub<>(keys.iterator()))) {
- final HasNextCondition hasNextCondition =
windowKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
- final List<Integer> results = new ArrayList<>();
- while (hasNextCondition.hasNext(iterator)) {
- results.add(iterator.next().value);
+ final HasNextCondition hasNextCondition =
windowKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
+ final List<Integer> results = new ArrayList<>();
+ while (hasNextCondition.hasNext(iterator)) {
+ results.add(iterator.next().value);
+ }
+
+ assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
}
- assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
}
@Test