This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new d95857a1559 KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166) d95857a1559 is described below commit d95857a155943013fe6da5d613b505e6e9688c16 Author: Bill Bejeck <bbej...@apache.org> AuthorDate: Mon Jul 14 20:19:16 2025 -0400 KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166) The `AdminClient` adds a telemetry reporter to the metrics reporters list in the constructor. The problem is that the reporter was already added in the `createInternal` method. In the `createInternal` method call, the `clientTelemetryReporter` is added to a `List<MetricReporters>` which is passed to the `Metrics` object, will get closed when `Metrics.close()` is called. But adding a reporter to the reporters list in the constructor is not used by the `Metrics` object and hence doesn't get closed, causing a memory leak. All related tests pass after this change. Reviewers: Apoorv Mittal <apoorvmitta...@apache.org>, Matthias J. Sax <matth...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com>, Jhen-Yung Hsu <jhenyung...@gmail.com> --- .../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index dc3164993b8..12e122b123a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -577,10 +577,12 @@ public class KafkaAdminClient extends AdminClient { Time time) { Metrics metrics = null; String clientId = generateClientId(config); + List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config); Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + clientTelemetryReporter.ifPresent(reporters::add); try { - metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time); + metrics = new Metrics(new MetricConfig(), reporters, time); LogContext logContext = createLogContext(clientId); return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, client, null, logContext, clientTelemetryReporter); @@ -625,9 +627,7 @@ public class KafkaAdminClient extends AdminClient { CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, retryBackoffMaxMs, CommonClientConfigs.RETRY_BACKOFF_JITTER); - List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config); this.clientTelemetryReporter = clientTelemetryReporter; - this.clientTelemetryReporter.ifPresent(reporters::add); this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG)); this.partitionLeaderCache = new HashMap<>(); this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);