This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23180770d7026a348bf8448cbd5ebdc032b05093
Author: Fabian Paul <[email protected]>
AuthorDate: Wed Dec 8 15:39:45 2021 +0100

    [hotfix][tests] Close Kafka AdminClients to prevent resource leaks
---
 .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java     | 9 +++++----
 .../streaming/connectors/kafka/table/KafkaTableTestBase.java     | 3 +--
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 141924b..5b2f62d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -353,10 +353,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
     @Override
     public int getLeaderToShutDown(String topic) throws Exception {
-        AdminClient client = AdminClient.create(getStandardProperties());
-        TopicDescription result =
-                
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
-        return result.partitions().get(0).leader().id();
+        try (final AdminClient client = 
AdminClient.create(getStandardProperties())) {
+            TopicDescription result =
+                    
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
+            return result.partitions().get(0).leader().id();
+        }
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 28c15a8..560d8d8 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -170,8 +170,7 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
     }
 
     private Map<String, TopicDescription> describeExternalTopics() {
-        final AdminClient adminClient = AdminClient.create(getStandardProps());
-        try {
+        try (final AdminClient adminClient = 
AdminClient.create(getStandardProps())) {
             final List<String> topics =
                     adminClient.listTopics().listings().get().stream()
                             .filter(listing -> !listing.isInternal())

Reply via email to