This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4271fd8c8b1 KAFKA-19564: Close Consumer in ConsumerPerformance only after metrics displayed (#20267) 4271fd8c8b1 is described below commit 4271fd8c8b17cc7f31be027a7c25fd31ea11a7d7 Author: Kirk True <k...@kirktrue.pro> AuthorDate: Wed Sep 3 01:25:21 2025 -0700 KAFKA-19564: Close Consumer in ConsumerPerformance only after metrics displayed (#20267) Ensure that metrics are retrieved and displayed (when requested) before `Consumer.close()` is called. This is important because metrics are technically supposed to be removed on `Consumer.close()`, which means retrieving them _after_ `close()` would yield an empty map. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../apache/kafka/tools/ConsumerPerformance.java | 84 +++++++++++----------- .../kafka/tools/ConsumerPerformanceTest.java | 21 ++++++ 2 files changed, 63 insertions(+), 42 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 0892693801a..62def15d324 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -16,13 +16,12 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; @@ -38,11 +37,11 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Random; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.regex.Pattern; import joptsimple.OptionException; @@ -55,6 +54,10 @@ public class ConsumerPerformance { private static final Random RND = new Random(); public static void main(String[] args) { + run(args, KafkaConsumer::new); + } + + static void run(String[] args, Function<Properties, Consumer<byte[], byte[]>> consumerCreator) { try { LOG.info("Starting consumer..."); ConsumerPerfOptions options = new ConsumerPerfOptions(args); @@ -66,45 +69,42 @@ public class ConsumerPerformance { if (!options.hideHeader()) printHeader(options.showDetailedStats()); - KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(options.props()); - long bytesRead = 0L; - long messagesRead = 0L; - long lastBytesRead = 0L; - long lastMessagesRead = 0L; - long currentTimeMs = System.currentTimeMillis(); - long joinStartMs = currentTimeMs; - long startMs = currentTimeMs; - consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs, - bytesRead, messagesRead, lastBytesRead, lastMessagesRead, - joinStartMs, joinTimeMsInSingleRound); - long endMs = System.currentTimeMillis(); - - Map<MetricName, ? extends Metric> metrics = null; - if (options.printMetrics()) - metrics = consumer.metrics(); - consumer.close(); - - // print final stats - double elapsedSec = (endMs - startMs) / 1_000.0; - long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get(); - if (!options.showDetailedStats()) { - double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 * 1024); - System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f%n", - options.dateFormat().format(startMs), - options.dateFormat().format(endMs), - totalMbRead, - totalMbRead / elapsedSec, - totalMessagesRead.get(), - totalMessagesRead.get() / elapsedSec, - joinTimeMs.get(), - fetchTimeInMs, - totalMbRead / (fetchTimeInMs / 1000.0), - totalMessagesRead.get() / (fetchTimeInMs / 1000.0) - ); - } + try (Consumer<byte[], byte[]> consumer = consumerCreator.apply(options.props())) { + long bytesRead = 0L; + long messagesRead = 0L; + long lastBytesRead = 0L; + long lastMessagesRead = 0L; + long currentTimeMs = System.currentTimeMillis(); + long joinStartMs = currentTimeMs; + long startMs = currentTimeMs; + consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs, + bytesRead, messagesRead, lastBytesRead, lastMessagesRead, + joinStartMs, joinTimeMsInSingleRound); + long endMs = System.currentTimeMillis(); + + // print final stats + double elapsedSec = (endMs - startMs) / 1_000.0; + long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get(); + if (!options.showDetailedStats()) { + double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 * 1024); + System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f%n", + options.dateFormat().format(startMs), + options.dateFormat().format(endMs), + totalMbRead, + totalMbRead / elapsedSec, + totalMessagesRead.get(), + totalMessagesRead.get() / elapsedSec, + joinTimeMs.get(), + fetchTimeInMs, + totalMbRead / (fetchTimeInMs / 1000.0), + totalMessagesRead.get() / (fetchTimeInMs / 1000.0) + ); + } - if (metrics != null) - ToolsUtils.printMetrics(metrics); + if (options.printMetrics()) { + ToolsUtils.printMetrics(consumer.metrics()); + } + } } catch (Throwable e) { System.err.println(e.getMessage()); System.err.println(Utils.stackTrace(e)); @@ -120,7 +120,7 @@ public class ConsumerPerformance { System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader); } - private static void consume(KafkaConsumer<byte[], byte[]> consumer, + private static void consume(Consumer<byte[], byte[]> consumer, ConsumerPerfOptions options, AtomicLong totalMessagesRead, AtomicLong totalBytesRead, diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index d78b65e54a3..df6c3a93966 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -16,8 +16,12 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -30,6 +34,8 @@ import java.io.PrintWriter; import java.nio.file.Files; import java.nio.file.Path; import java.text.SimpleDateFormat; +import java.util.Properties; +import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -167,6 +173,21 @@ public class ConsumerPerformanceTest { assertEquals("perf-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); } + @Test + public void testMetricsRetrievedBeforeConsumerClosed() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "0", + "--print-metrics" + }; + + Function<Properties, Consumer<byte[], byte[]>> consumerCreator = properties -> new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); + + String err = ToolsTestUtils.captureStandardErr(() -> ConsumerPerformance.run(args, consumerCreator)); + assertTrue(Utils.isBlank(err), "Should be no stderr message, but was \"" + err + "\""); + } + private void testHeaderMatchContent(boolean detailed, int expectedOutputLineCount, Runnable runnable) { String out = ToolsTestUtils.captureStandardOut(() -> { ConsumerPerformance.printHeader(detailed);