This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new d95857a1559 KAFKA-19504: Remove unused metrics reporter initialization 
in KafkaAdminClient (#20166)
d95857a1559 is described below

commit d95857a155943013fe6da5d613b505e6e9688c16
Author: Bill Bejeck <bbej...@apache.org>
AuthorDate: Mon Jul 14 20:19:16 2025 -0400

    KAFKA-19504: Remove unused metrics reporter initialization in 
KafkaAdminClient (#20166)
    
    The `AdminClient` adds a telemetry reporter to the metrics reporters
    list in the constructor.  The problem is that the reporter was already
    added in the `createInternal` method.  In the `createInternal` method
    call, the `clientTelemetryReporter` is added to a
    `List<MetricReporters>` which is passed to the `Metrics` object, will
    get closed when `Metrics.close()` is called.  But adding a reporter to
    the reporters list in the constructor is not used by the `Metrics`
    object and hence doesn't get closed, causing a memory leak.
    
    All related tests pass after this change.
    
    Reviewers: Apoorv Mittal <apoorvmitta...@apache.org>, Matthias J. Sax
     <matth...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com>,
     Jhen-Yung Hsu <jhenyung...@gmail.com>
---
 .../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dc3164993b8..12e122b123a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -577,10 +577,12 @@ public class KafkaAdminClient extends AdminClient {
                                            Time time) {
         Metrics metrics = null;
         String clientId = generateClientId(config);
+        List<MetricsReporter> reporters = 
CommonClientConfigs.metricsReporters(clientId, config);
         Optional<ClientTelemetryReporter> clientTelemetryReporter = 
CommonClientConfigs.telemetryReporter(clientId, config);
+        clientTelemetryReporter.ifPresent(reporters::add);
 
         try {
-            metrics = new Metrics(new MetricConfig(), new LinkedList<>(), 
time);
+            metrics = new Metrics(new MetricConfig(), reporters, time);
             LogContext logContext = createLogContext(clientId);
             return new KafkaAdminClient(config, clientId, time, 
metadataManager, metrics,
                 client, null, logContext, clientTelemetryReporter);
@@ -625,9 +627,7 @@ public class KafkaAdminClient extends AdminClient {
             CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
             retryBackoffMaxMs,
             CommonClientConfigs.RETRY_BACKOFF_JITTER);
-        List<MetricsReporter> reporters = 
CommonClientConfigs.metricsReporters(this.clientId, config);
         this.clientTelemetryReporter = clientTelemetryReporter;
-        this.clientTelemetryReporter.ifPresent(reporters::add);
         this.metadataRecoveryStrategy = 
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
         this.partitionLeaderCache = new HashMap<>();
         this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);

Reply via email to