This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dd82542493e KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166) dd82542493e is described below commit dd82542493e2350bb6b1f01b1f716656b02804da Author: Bill Bejeck <bbej...@apache.org> AuthorDate: Mon Jul 14 20:19:16 2025 -0400 KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166) The `AdminClient` adds a telemetry reporter to the metrics reporters list in the constructor. The problem is that the reporter was already added in the `createInternal` method. In the `createInternal` method call, the `clientTelemetryReporter` is added to a `List<MetricReporters>` which is passed to the `Metrics` object, will get closed when `Metrics.close()` is called. But adding a reporter to the reporters list in the constructor is not used by the `Metrics` object and hence doesn't get closed, causing a memory leak. All related tests pass after this change. Reviewers: Apoorv Mittal <apoorvmitta...@apache.org>, Matthias J. Sax <matth...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com>, Jhen-Yung Hsu <jhenyung...@gmail.com> --- .../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e1be4304950..ac2d67d2022 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -579,10 +579,12 @@ public class KafkaAdminClient extends AdminClient { Time time) { Metrics metrics = null; String clientId = generateClientId(config); + List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config); Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + clientTelemetryReporter.ifPresent(reporters::add); try { - metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time); + metrics = new Metrics(new MetricConfig(), reporters, time); LogContext logContext = createLogContext(clientId); return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, client, null, logContext, clientTelemetryReporter); @@ -627,9 +629,7 @@ public class KafkaAdminClient extends AdminClient { CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, retryBackoffMaxMs, CommonClientConfigs.RETRY_BACKOFF_JITTER); - List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config); this.clientTelemetryReporter = clientTelemetryReporter; - this.clientTelemetryReporter.ifPresent(reporters::add); this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG)); this.partitionLeaderCache = new HashMap<>(); this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);