This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 907e56d265d KAFKA-19868 Add client id to share consumers in 
ShareConsumerPerformance (#20840)
907e56d265d is described below

commit 907e56d265dd59d4eaeb41d499679904e4497a93
Author: Abhinav Dixit <[email protected]>
AuthorDate: Sun Nov 9 12:28:04 2025 +0530

    KAFKA-19868 Add client id to share consumers in ShareConsumerPerformance 
(#20840)
    
    ### About
    In order to make observation easy, we've added different client id for
    different share consumers (if threads is set to greater than 1) when
    running `ShareConsumerPerformance` script.
    
    Screenshot with 3 share consumers -   <img width="864" height="288"
    alt="Screenshot 2025-11-06 at 6 43 39 PM"
    
    
src="https://github.com/user-attachments/assets/039f1fcf-3385-4fbf-bb76-e637a3bd75b3";
    />
    
    Screenshot with 1 share consumer -   <img width="862" height="152"
    alt="Screenshot 2025-11-06 at 6 43 58 PM"
    
    
src="https://github.com/user-attachments/assets/21f52b03-4f3b-48bd-9a41-51ba58ec52fa";
    />
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/tools/ShareConsumerPerformance.java      | 80 ++++++++++++++--------
 1 file changed, 50 insertions(+), 30 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
index 51c66704668..f24670c0adb 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
@@ -75,11 +75,21 @@ public class ShareConsumerPerformance {
                 printHeader();
 
             List<ShareConsumer<byte[], byte[]>> shareConsumers = new 
ArrayList<>();
+            List<String> clientIds = new ArrayList<>();
             for (int i = 0; i < options.threads(); i++) {
-                
shareConsumers.add(shareConsumerCreator.apply(options.props()));
+                if (options.threads() == 1) {
+                    
clientIds.add(options.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+                    
shareConsumers.add(shareConsumerCreator.apply(options.props()));
+                    break;
+                }
+                Properties shareConsumerProps = options.props();
+                String shareConsumerClientId = 
options.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG) + "-" + (i + 1);
+                shareConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, 
shareConsumerClientId);
+                clientIds.add(shareConsumerClientId);
+                
shareConsumers.add(shareConsumerCreator.apply(shareConsumerProps));
             }
             long startMs = System.currentTimeMillis();
-            consume(shareConsumers, options, totalRecordsRead, totalBytesRead, 
startMs);
+            consume(shareConsumers, options, totalRecordsRead, totalBytesRead, 
startMs, clientIds);
             long endMs = System.currentTimeMillis();
 
             List<Map<MetricName, ? extends Metric>> shareConsumersMetrics = 
new ArrayList<>();
@@ -94,8 +104,8 @@ public class ShareConsumerPerformance {
             // Print final stats for share group.
             double elapsedSec = (endMs - startMs) / 1_000.0;
             long fetchTimeInMs = endMs - startMs;
-            printStats(totalBytesRead.get(), totalRecordsRead.get(), 
elapsedSec, fetchTimeInMs, startMs, endMs,
-                    options.dateFormat(), -1);
+            printStatsForShareGroup(totalBytesRead.get(), 
totalRecordsRead.get(), elapsedSec, fetchTimeInMs, startMs,
+                endMs, options.dateFormat());
 
             shareConsumersMetrics.forEach(ToolsUtils::printMetrics);
 
@@ -116,7 +126,8 @@ public class ShareConsumerPerformance {
                                 ShareConsumerPerfOptions options,
                                 AtomicLong totalRecordsRead,
                                 AtomicLong totalBytesRead,
-                                long startMs) throws ExecutionException, 
InterruptedException {
+                                long startMs,
+                                List<String> clientIds) throws 
ExecutionException, InterruptedException {
         long numRecords = options.numRecords();
         long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
         shareConsumers.forEach(shareConsumer -> 
shareConsumer.subscribe(options.topic()));
@@ -174,7 +185,7 @@ public class ShareConsumerPerformance {
                 long fetchTimeInMs = endMs - startMs;
                 long recordsReadByConsumer = 
shareConsumersConsumptionDetails.get(index).recordsConsumed();
                 long bytesReadByConsumer = 
shareConsumersConsumptionDetails.get(index).bytesConsumed();
-                printStats(bytesReadByConsumer, recordsReadByConsumer, 
elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
+                printStatsForShareConsumer(clientIds.get(index), 
bytesReadByConsumer, recordsReadByConsumer, elapsedSec, fetchTimeInMs, startMs, 
endMs, options.dateFormat(), index + 1);
             }
         }
 
@@ -251,31 +262,20 @@ public class ShareConsumerPerformance {
         System.out.println();
     }
 
-    // Prints stats for both share consumer and share group. For share group, 
index is -1. For share consumer,
-    // index is >= 1.
-    private static void printStats(long bytesRead,
-                                   long recordsRead,
-                                   double elapsedSec,
-                                   long fetchTimeInMs,
-                                   long startMs,
-                                   long endMs,
-                                   SimpleDateFormat dateFormat,
-                                   int index) {
+    private static void printStatsForShareConsumer(
+        String clientId,
+        long bytesRead,
+        long recordsRead,
+        double elapsedSec,
+        long fetchTimeInMs,
+        long startMs,
+        long endMs,
+        SimpleDateFormat dateFormat,
+        int index) {
         double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
-        if (index != -1) {
-            System.out.printf("Share consumer %s consumption metrics- %s, %s, 
%.4f, %.4f, %.4f, %d, %d%n",
-                    index,
-                    dateFormat.format(startMs),
-                    dateFormat.format(endMs),
-                    totalMbRead,
-                    totalMbRead / elapsedSec,
-                    recordsRead / elapsedSec,
-                    recordsRead,
-                    fetchTimeInMs
-            );
-            return;
-        }
-        System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d%n",
+        System.out.printf("Share consumer %s having client id %s consumption 
metrics- %s, %s, %.4f, %.4f, %.4f, %d, %d%n",
+                index,
+                clientId,
                 dateFormat.format(startMs),
                 dateFormat.format(endMs),
                 totalMbRead,
@@ -286,6 +286,26 @@ public class ShareConsumerPerformance {
         );
     }
 
+    private static void printStatsForShareGroup(
+        long bytesRead,
+        long recordsRead,
+        double elapsedSec,
+        long fetchTimeInMs,
+        long startMs,
+        long endMs,
+        SimpleDateFormat dateFormat) {
+        double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
+        System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d%n",
+            dateFormat.format(startMs),
+            dateFormat.format(endMs),
+            totalMbRead,
+            totalMbRead / elapsedSec,
+            recordsRead / elapsedSec,
+            recordsRead,
+            fetchTimeInMs
+        );
+    }
+
     protected static class ShareConsumerPerfOptions extends 
CommandDefaultOptions {
         private final OptionSpec<String> bootstrapServerOpt;
         private final OptionSpec<String> topicOpt;

Reply via email to