This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a2aa3b9 close TopologyTestDriver to release resources (#11143)
a2aa3b9 is described below
commit a2aa3b9ea5f4e1cbe81c62400a6dc37bd9e9aadf
Author: Luke Chen <[email protected]>
AuthorDate: Fri Jul 30 03:50:57 2021 +0800
close TopologyTestDriver to release resources (#11143)
Close TopologyTestDriver to release resources
Reviewers: Bill Bejeck <[email protected]>
---
.../kafka/streams/kstream/internals/AbstractStreamTest.java | 13 +++++++------
.../streams/processor/internals/ProcessorNodeTest.java | 13 +++++++------
2 files changed, 14 insertions(+), 12 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 067a531..ae7ef8f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -81,14 +81,15 @@ public class AbstractStreamTest {
stream.randomFilter().process(supplier);
- final TopologyTestDriver driver = new
TopologyTestDriver(builder.build());
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build())) {
- final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(topicName, new IntegerSerializer(), new
StringSerializer());
- for (final int expectedKey : expectedKeys) {
- inputTopic.pipeInput(expectedKey, "V" + expectedKey);
- }
+ final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(topicName, new IntegerSerializer(), new
StringSerializer());
+ for (final int expectedKey : expectedKeys) {
+ inputTopic.pipeInput(expectedKey, "V" + expectedKey);
+ }
- assertTrue(supplier.theCapturedProcessor().processed().size() <=
expectedKeys.length);
+ assertTrue(supplier.theCapturedProcessor().processed().size() <=
expectedKeys.length);
+ }
}
private static class ExtendedKStream<K, V> extends AbstractStream<K, V> {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index f82641b..73147ed 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -148,13 +148,14 @@ public class ProcessorNodeTest {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class);
- final TopologyTestDriver testDriver = new TopologyTestDriver(topology,
config);
- final TestInputTopic<String, String> topic =
testDriver.createInputTopic("streams-plaintext-input", new StringSerializer(),
new StringSerializer());
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(topology, config)) {
+ final TestInputTopic<String, String> topic =
testDriver.createInputTopic("streams-plaintext-input", new StringSerializer(),
new StringSerializer());
- final StreamsException se = assertThrows(StreamsException.class, () ->
topic.pipeInput("a-key", "a value"));
- final String msg = se.getMessage();
- assertTrue("Error about class cast with serdes",
msg.contains("ClassCastException"));
- assertTrue("Error about class cast with serdes",
msg.contains("Serdes"));
+ final StreamsException se = assertThrows(StreamsException.class,
() -> topic.pipeInput("a-key", "a value"));
+ final String msg = se.getMessage();
+ assertTrue("Error about class cast with serdes",
msg.contains("ClassCastException"));
+ assertTrue("Error about class cast with serdes",
msg.contains("Serdes"));
+ }
}
@Test