This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a2aa3b9  close TopologyTestDriver to release resources (#11143)
a2aa3b9 is described below

commit a2aa3b9ea5f4e1cbe81c62400a6dc37bd9e9aadf
Author: Luke Chen <[email protected]>
AuthorDate: Fri Jul 30 03:50:57 2021 +0800

    close TopologyTestDriver to release resources (#11143)
    
    Close TopologyTestDriver to release resources
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../kafka/streams/kstream/internals/AbstractStreamTest.java | 13 +++++++------
 .../streams/processor/internals/ProcessorNodeTest.java      | 13 +++++++------
 2 files changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 067a531..ae7ef8f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -81,14 +81,15 @@ public class AbstractStreamTest {
 
         stream.randomFilter().process(supplier);
 
-        final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build())) {
 
-        final TestInputTopic<Integer, String> inputTopic = 
driver.createInputTopic(topicName, new IntegerSerializer(), new 
StringSerializer());
-        for (final int expectedKey : expectedKeys) {
-            inputTopic.pipeInput(expectedKey, "V" + expectedKey);
-        }
+            final TestInputTopic<Integer, String> inputTopic = 
driver.createInputTopic(topicName, new IntegerSerializer(), new 
StringSerializer());
+            for (final int expectedKey : expectedKeys) {
+                inputTopic.pipeInput(expectedKey, "V" + expectedKey);
+            }
 
-        assertTrue(supplier.theCapturedProcessor().processed().size() <= 
expectedKeys.length);
+            assertTrue(supplier.theCapturedProcessor().processed().size() <= 
expectedKeys.length);
+        }
     }
 
     private static class ExtendedKStream<K, V> extends AbstractStream<K, V> {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index f82641b..73147ed 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -148,13 +148,14 @@ public class ProcessorNodeTest {
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class);
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class);
 
-        final TopologyTestDriver testDriver = new TopologyTestDriver(topology, 
config);
-        final TestInputTopic<String, String> topic = 
testDriver.createInputTopic("streams-plaintext-input", new StringSerializer(), 
new StringSerializer());
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, config)) {
+            final TestInputTopic<String, String> topic = 
testDriver.createInputTopic("streams-plaintext-input", new StringSerializer(), 
new StringSerializer());
 
-        final StreamsException se = assertThrows(StreamsException.class, () -> 
topic.pipeInput("a-key", "a value"));
-        final String msg = se.getMessage();
-        assertTrue("Error about class cast with serdes", 
msg.contains("ClassCastException"));
-        assertTrue("Error about class cast with serdes", 
msg.contains("Serdes"));
+            final StreamsException se = assertThrows(StreamsException.class, 
() -> topic.pipeInput("a-key", "a value"));
+            final String msg = se.getMessage();
+            assertTrue("Error about class cast with serdes", 
msg.contains("ClassCastException"));
+            assertTrue("Error about class cast with serdes", 
msg.contains("Serdes"));
+        }
     }
 
     @Test

Reply via email to