This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 23180770d7026a348bf8448cbd5ebdc032b05093 Author: Fabian Paul <[email protected]> AuthorDate: Wed Dec 8 15:39:45 2021 +0100 [hotfix][tests] Close Kafka AdminClients to prevent resource leaks --- .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 9 +++++---- .../streaming/connectors/kafka/table/KafkaTableTestBase.java | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 141924b..5b2f62d 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -353,10 +353,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override public int getLeaderToShutDown(String topic) throws Exception { - AdminClient client = AdminClient.create(getStandardProperties()); - TopicDescription result = - client.describeTopics(Collections.singleton(topic)).all().get().get(topic); - return result.partitions().get(0).leader().id(); + try (final AdminClient client = AdminClient.create(getStandardProperties())) { + TopicDescription result = + client.describeTopics(Collections.singleton(topic)).all().get().get(topic); + return result.partitions().get(0).leader().id(); + } } @Override diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java index 28c15a8..560d8d8 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java @@ -170,8 +170,7 @@ public abstract class KafkaTableTestBase extends AbstractTestBase { } private Map<String, TopicDescription> describeExternalTopics() { - final AdminClient adminClient = AdminClient.create(getStandardProps()); - try { + try (final AdminClient adminClient = AdminClient.create(getStandardProps())) { final List<String> topics = adminClient.listTopics().listings().get().stream() .filter(listing -> !listing.isInternal())
