This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 907e56d265d KAFKA-19868 Add client id to share consumers in
ShareConsumerPerformance (#20840)
907e56d265d is described below
commit 907e56d265dd59d4eaeb41d499679904e4497a93
Author: Abhinav Dixit <[email protected]>
AuthorDate: Sun Nov 9 12:28:04 2025 +0530
KAFKA-19868 Add client id to share consumers in ShareConsumerPerformance
(#20840)
### About
In order to make observation easy, we've added different client id for
different share consumers (if threads is set to greater than 1) when
running `ShareConsumerPerformance` script.
Screenshot with 3 share consumers - <img width="864" height="288"
alt="Screenshot 2025-11-06 at 6 43 39 PM"
src="https://github.com/user-attachments/assets/039f1fcf-3385-4fbf-bb76-e637a3bd75b3"
/>
Screenshot with 1 share consumer - <img width="862" height="152"
alt="Screenshot 2025-11-06 at 6 43 58 PM"
src="https://github.com/user-attachments/assets/21f52b03-4f3b-48bd-9a41-51ba58ec52fa"
/>
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/tools/ShareConsumerPerformance.java | 80 ++++++++++++++--------
1 file changed, 50 insertions(+), 30 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
index 51c66704668..f24670c0adb 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
@@ -75,11 +75,21 @@ public class ShareConsumerPerformance {
printHeader();
List<ShareConsumer<byte[], byte[]>> shareConsumers = new
ArrayList<>();
+ List<String> clientIds = new ArrayList<>();
for (int i = 0; i < options.threads(); i++) {
-
shareConsumers.add(shareConsumerCreator.apply(options.props()));
+ if (options.threads() == 1) {
+
clientIds.add(options.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+
shareConsumers.add(shareConsumerCreator.apply(options.props()));
+ break;
+ }
+ Properties shareConsumerProps = options.props();
+ String shareConsumerClientId =
options.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG) + "-" + (i + 1);
+ shareConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG,
shareConsumerClientId);
+ clientIds.add(shareConsumerClientId);
+
shareConsumers.add(shareConsumerCreator.apply(shareConsumerProps));
}
long startMs = System.currentTimeMillis();
- consume(shareConsumers, options, totalRecordsRead, totalBytesRead,
startMs);
+ consume(shareConsumers, options, totalRecordsRead, totalBytesRead,
startMs, clientIds);
long endMs = System.currentTimeMillis();
List<Map<MetricName, ? extends Metric>> shareConsumersMetrics =
new ArrayList<>();
@@ -94,8 +104,8 @@ public class ShareConsumerPerformance {
// Print final stats for share group.
double elapsedSec = (endMs - startMs) / 1_000.0;
long fetchTimeInMs = endMs - startMs;
- printStats(totalBytesRead.get(), totalRecordsRead.get(),
elapsedSec, fetchTimeInMs, startMs, endMs,
- options.dateFormat(), -1);
+ printStatsForShareGroup(totalBytesRead.get(),
totalRecordsRead.get(), elapsedSec, fetchTimeInMs, startMs,
+ endMs, options.dateFormat());
shareConsumersMetrics.forEach(ToolsUtils::printMetrics);
@@ -116,7 +126,8 @@ public class ShareConsumerPerformance {
ShareConsumerPerfOptions options,
AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
- long startMs) throws ExecutionException,
InterruptedException {
+ long startMs,
+ List<String> clientIds) throws
ExecutionException, InterruptedException {
long numRecords = options.numRecords();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
shareConsumers.forEach(shareConsumer ->
shareConsumer.subscribe(options.topic()));
@@ -174,7 +185,7 @@ public class ShareConsumerPerformance {
long fetchTimeInMs = endMs - startMs;
long recordsReadByConsumer =
shareConsumersConsumptionDetails.get(index).recordsConsumed();
long bytesReadByConsumer =
shareConsumersConsumptionDetails.get(index).bytesConsumed();
- printStats(bytesReadByConsumer, recordsReadByConsumer,
elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
+ printStatsForShareConsumer(clientIds.get(index),
bytesReadByConsumer, recordsReadByConsumer, elapsedSec, fetchTimeInMs, startMs,
endMs, options.dateFormat(), index + 1);
}
}
@@ -251,31 +262,20 @@ public class ShareConsumerPerformance {
System.out.println();
}
- // Prints stats for both share consumer and share group. For share group,
index is -1. For share consumer,
- // index is >= 1.
- private static void printStats(long bytesRead,
- long recordsRead,
- double elapsedSec,
- long fetchTimeInMs,
- long startMs,
- long endMs,
- SimpleDateFormat dateFormat,
- int index) {
+ private static void printStatsForShareConsumer(
+ String clientId,
+ long bytesRead,
+ long recordsRead,
+ double elapsedSec,
+ long fetchTimeInMs,
+ long startMs,
+ long endMs,
+ SimpleDateFormat dateFormat,
+ int index) {
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
- if (index != -1) {
- System.out.printf("Share consumer %s consumption metrics- %s, %s,
%.4f, %.4f, %.4f, %d, %d%n",
- index,
- dateFormat.format(startMs),
- dateFormat.format(endMs),
- totalMbRead,
- totalMbRead / elapsedSec,
- recordsRead / elapsedSec,
- recordsRead,
- fetchTimeInMs
- );
- return;
- }
- System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d%n",
+ System.out.printf("Share consumer %s having client id %s consumption
metrics- %s, %s, %.4f, %.4f, %.4f, %d, %d%n",
+ index,
+ clientId,
dateFormat.format(startMs),
dateFormat.format(endMs),
totalMbRead,
@@ -286,6 +286,26 @@ public class ShareConsumerPerformance {
);
}
+ private static void printStatsForShareGroup(
+ long bytesRead,
+ long recordsRead,
+ double elapsedSec,
+ long fetchTimeInMs,
+ long startMs,
+ long endMs,
+ SimpleDateFormat dateFormat) {
+ double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
+ System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d%n",
+ dateFormat.format(startMs),
+ dateFormat.format(endMs),
+ totalMbRead,
+ totalMbRead / elapsedSec,
+ recordsRead / elapsedSec,
+ recordsRead,
+ fetchTimeInMs
+ );
+ }
+
protected static class ShareConsumerPerfOptions extends
CommandDefaultOptions {
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicOpt;