This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4271fd8c8b1 KAFKA-19564: Close Consumer in ConsumerPerformance only 
after metrics displayed (#20267)
4271fd8c8b1 is described below

commit 4271fd8c8b17cc7f31be027a7c25fd31ea11a7d7
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Wed Sep 3 01:25:21 2025 -0700

    KAFKA-19564: Close Consumer in ConsumerPerformance only after metrics 
displayed (#20267)
    
    Ensure that metrics are retrieved and displayed (when requested) before
    `Consumer.close()` is called. This is important because metrics are
    technically supposed to be removed on `Consumer.close()`, which means
    retrieving them _after_ `close()` would yield an empty map.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../apache/kafka/tools/ConsumerPerformance.java    | 84 +++++++++++-----------
 .../kafka/tools/ConsumerPerformanceTest.java       | 21 ++++++
 2 files changed, 63 insertions(+), 42 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 0892693801a..62def15d324 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -16,13 +16,12 @@
  */
 package org.apache.kafka.tools;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
@@ -38,11 +37,11 @@ import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
 import joptsimple.OptionException;
@@ -55,6 +54,10 @@ public class ConsumerPerformance {
     private static final Random RND = new Random();
 
     public static void main(String[] args) {
+        run(args, KafkaConsumer::new);
+    }
+
+    static void run(String[] args, Function<Properties, Consumer<byte[], 
byte[]>> consumerCreator) {
         try {
             LOG.info("Starting consumer...");
             ConsumerPerfOptions options = new ConsumerPerfOptions(args);
@@ -66,45 +69,42 @@ public class ConsumerPerformance {
             if (!options.hideHeader())
                 printHeader(options.showDetailedStats());
 
-            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(options.props());
-            long bytesRead = 0L;
-            long messagesRead = 0L;
-            long lastBytesRead = 0L;
-            long lastMessagesRead = 0L;
-            long currentTimeMs = System.currentTimeMillis();
-            long joinStartMs = currentTimeMs;
-            long startMs = currentTimeMs;
-            consume(consumer, options, totalMessagesRead, totalBytesRead, 
joinTimeMs,
-                bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
-                joinStartMs, joinTimeMsInSingleRound);
-            long endMs = System.currentTimeMillis();
-
-            Map<MetricName, ? extends Metric> metrics = null;
-            if (options.printMetrics())
-                metrics = consumer.metrics();
-            consumer.close();
-
-            // print final stats
-            double elapsedSec = (endMs - startMs) / 1_000.0;
-            long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
-            if (!options.showDetailedStats()) {
-                double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 * 
1024);
-                System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, 
%.4f%n",
-                    options.dateFormat().format(startMs),
-                    options.dateFormat().format(endMs),
-                    totalMbRead,
-                    totalMbRead / elapsedSec,
-                    totalMessagesRead.get(),
-                    totalMessagesRead.get() / elapsedSec,
-                    joinTimeMs.get(),
-                    fetchTimeInMs,
-                    totalMbRead / (fetchTimeInMs / 1000.0),
-                    totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
-                );
-            }
+            try (Consumer<byte[], byte[]> consumer = 
consumerCreator.apply(options.props())) {
+                long bytesRead = 0L;
+                long messagesRead = 0L;
+                long lastBytesRead = 0L;
+                long lastMessagesRead = 0L;
+                long currentTimeMs = System.currentTimeMillis();
+                long joinStartMs = currentTimeMs;
+                long startMs = currentTimeMs;
+                consume(consumer, options, totalMessagesRead, totalBytesRead, 
joinTimeMs,
+                    bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
+                    joinStartMs, joinTimeMsInSingleRound);
+                long endMs = System.currentTimeMillis();
+
+                // print final stats
+                double elapsedSec = (endMs - startMs) / 1_000.0;
+                long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
+                if (!options.showDetailedStats()) {
+                    double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 
* 1024);
+                    System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, 
%.4f, %.4f%n",
+                        options.dateFormat().format(startMs),
+                        options.dateFormat().format(endMs),
+                        totalMbRead,
+                        totalMbRead / elapsedSec,
+                        totalMessagesRead.get(),
+                        totalMessagesRead.get() / elapsedSec,
+                        joinTimeMs.get(),
+                        fetchTimeInMs,
+                        totalMbRead / (fetchTimeInMs / 1000.0),
+                        totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
+                    );
+                }
 
-            if (metrics != null)
-                ToolsUtils.printMetrics(metrics);
+                if (options.printMetrics()) {
+                    ToolsUtils.printMetrics(consumer.metrics());
+                }
+            }
         } catch (Throwable e) {
             System.err.println(e.getMessage());
             System.err.println(Utils.stackTrace(e));
@@ -120,7 +120,7 @@ public class ConsumerPerformance {
             System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, 
data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
     }
 
-    private static void consume(KafkaConsumer<byte[], byte[]> consumer,
+    private static void consume(Consumer<byte[], byte[]> consumer,
                                 ConsumerPerfOptions options,
                                 AtomicLong totalMessagesRead,
                                 AtomicLong totalBytesRead,
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
index d78b65e54a3..df6c3a93966 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.tools;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -30,6 +34,8 @@ import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.SimpleDateFormat;
+import java.util.Properties;
+import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -167,6 +173,21 @@ public class ConsumerPerformanceTest {
         assertEquals("perf-consumer-client", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testMetricsRetrievedBeforeConsumerClosed() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--messages", "0",
+            "--print-metrics"
+        };
+
+        Function<Properties, Consumer<byte[], byte[]>> consumerCreator = 
properties -> new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name());
+
+        String err = ToolsTestUtils.captureStandardErr(() -> 
ConsumerPerformance.run(args, consumerCreator));
+        assertTrue(Utils.isBlank(err), "Should be no stderr message, but was 
\"" + err + "\"");
+    }
+
     private void testHeaderMatchContent(boolean detailed, int 
expectedOutputLineCount, Runnable runnable) {
         String out = ToolsTestUtils.captureStandardOut(() -> {
             ConsumerPerformance.printHeader(detailed);

Reply via email to