This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e588122  [testclient] Add new option --num-messages for consumer and 
reader (#12016)
e588122 is described below

commit e5881222c3c922db933afba2fb3a639d2c99583c
Author: Ruguo Yu <[email protected]>
AuthorDate: Tue Sep 14 14:10:43 2021 +0800

    [testclient] Add new option --num-messages for consumer and reader (#12016)
---
 .../apache/pulsar/proxy/socket/client/PerformanceClient.java  |  2 +-
 .../org/apache/pulsar/testclient/ManagedLedgerWriter.java     | 11 +++++------
 .../org/apache/pulsar/testclient/PerformanceConsumer.java     | 11 ++++++++++-
 .../org/apache/pulsar/testclient/PerformanceProducer.java     |  4 ++--
 .../java/org/apache/pulsar/testclient/PerformanceReader.java  | 11 ++++++++++-
 site2/docs/reference-cli-tools.md                             |  2 ++
 6 files changed, 30 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 3902d5f..e147d6d 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -248,7 +248,7 @@ public class PerformanceClient {
                     for (String topic : producersMap.keySet()) {
                         if (messages > 0) {
                             if (totalSent >= messages) {
-                                log.trace("------------------- DONE 
-----------------------");
+                                log.trace("------------- DONE (reached the 
maximum number: [{}] of production) --------------", messages);
                                 Thread.sleep(10000);
                                 PerfClientUtils.exit(0);
                             }
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 1520c92..950406d 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -150,8 +150,6 @@ public class ManagedLedgerWriter {
             PerfClientUtils.exit(-1);
         }
 
-        arguments.testTime = TimeUnit.SECONDS.toMillis(arguments.testTime);
-
         // Dump config variables
         PerfClientUtils.printJVMInformation(log);
         ObjectMapper m = new ObjectMapper();
@@ -240,7 +238,8 @@ public class ManagedLedgerWriter {
 
                     // Acquire 1 sec worth of messages to have a slower ramp-up
                     rateLimiter.acquire((int) msgRate);
-                    final long startTime = System.currentTimeMillis();
+                    final long startTime = System.nanoTime();
+                    final long testEndTime = startTime + (long) 
(arguments.testTime * 1e9);
 
                     final Semaphore semaphore = new 
Semaphore(maxOutstandingForThisThread);
 
@@ -270,8 +269,8 @@ public class ManagedLedgerWriter {
                     while (true) {
                         for (int j = 0; j < nunManagedLedgersForThisThread; 
j++) {
                             if (arguments.testTime > 0) {
-                                if (System.currentTimeMillis() - startTime > 
arguments.testTime) {
-                                    log.info("------------------- DONE 
-----------------------");
+                                if (System.nanoTime() > testEndTime) {
+                                    log.info("------------- DONE (reached the 
maximum duration: [{} seconds] of production) --------------", 
arguments.testTime);
                                     printAggregatedStats();
                                     isDone.set(true);
                                     Thread.sleep(5000);
@@ -281,7 +280,7 @@ public class ManagedLedgerWriter {
 
                             if (numMessagesForThisThread > 0) {
                                 if (totalSent++ >= numMessagesForThisThread) {
-                                    log.info("------------------- DONE 
-----------------------");
+                                    log.info("------------- DONE (reached the 
maximum number: [{}] of production) --------------", numMessagesForThisThread);
                                     printAggregatedStats();
                                     isDone.set(true);
                                     Thread.sleep(5000);
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 32bc8b3..49eb0c9 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -118,6 +118,10 @@ public class PerformanceConsumer {
         @Parameter(names = { "--acks-delay-millis" }, description = 
"Acknowledgements grouping delay in millis")
         public int acknowledgmentsGroupingDelayMillis = 100;
 
+        @Parameter(names = {"-m",
+                "--num-messages"}, description = "Number of messages to 
consume in total. If <= 0, it will keep consuming")
+        public long numMessages = 0;
+
         @Parameter(names = { "-c",
                 "--max-connections" }, description = "Max number of TCP 
connections to a single broker")
         public int maxConnections = 100;
@@ -288,11 +292,16 @@ public class PerformanceConsumer {
         MessageListener<ByteBuffer> listener = (consumer, msg) -> {
             if (arguments.testTime > 0) {
                 if (System.nanoTime() > testEndTime) {
-                    log.info("------------------- DONE 
-----------------------");
+                    log.info("------------- DONE (reached the maximum 
duration: [{} seconds] of consumption) --------------", arguments.testTime);
                     printAggregatedStats();
                     PerfClientUtils.exit(0);
                 }
             }
+            if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= 
arguments.numMessages) {
+                log.info("------------- DONE (reached the maximum number: [{}] 
of consumption) --------------", arguments.numMessages);
+                printAggregatedStats();
+                PerfClientUtils.exit(0);
+            }
             messagesReceived.increment();
             bytesReceived.add(msg.size());
 
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index f198a2d..379464c 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -601,7 +601,7 @@ public class PerformanceProducer {
                 for (Producer<byte[]> producer : producers) {
                     if (arguments.testTime > 0) {
                         if (System.nanoTime() > testEndTime) {
-                            log.info("------------------- DONE 
-----------------------");
+                            log.info("------------- DONE (reached the maximum 
duration: [{} seconds] of production) --------------", arguments.testTime);
                             printAggregatedStats();
                             doneLatch.countDown();
                             Thread.sleep(5000);
@@ -611,7 +611,7 @@ public class PerformanceProducer {
 
                     if (numMessages > 0) {
                         if (totalSent++ >= numMessages) {
-                            log.info("------------------- DONE 
-----------------------");
+                            log.info("------------- DONE (reached the maximum 
number: {} of production) --------------", numMessages);
                             printAggregatedStats();
                             doneLatch.countDown();
                             Thread.sleep(5000);
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index c4becf0..91d840e 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -85,6 +85,10 @@ public class PerformanceReader {
         @Parameter(names = { "-q", "--receiver-queue-size" }, description = 
"Size of the receiver queue")
         public int receiverQueueSize = 1000;
 
+        @Parameter(names = {"-n",
+                "--num-messages"}, description = "Number of messages to 
consume in total. If <= 0, it will keep consuming")
+        public long numMessages = 0;
+
         @Parameter(names = { "-c",
                 "--max-connections" }, description = "Max number of TCP 
connections to a single broker")
         public int maxConnections = 100;
@@ -215,10 +219,15 @@ public class PerformanceReader {
         ReaderListener<byte[]> listener = (reader, msg) -> {
             if (arguments.testTime > 0) {
                 if (System.nanoTime() > testEndTime) {
-                    log.info("------------------- DONE 
-----------------------");
+                    log.info("------------- DONE (reached the maximum 
duration: [{} seconds] of consumption) --------------", arguments.testTime);
                     PerfClientUtils.exit(0);
                 }
             }
+            if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= 
arguments.numMessages) {
+                log.info("------------- DONE (reached the maximum number: [{}] 
of consumption) --------------", arguments.numMessages);
+                printAggregatedStats();
+                PerfClientUtils.exit(0);
+            }
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);
 
diff --git a/site2/docs/reference-cli-tools.md 
b/site2/docs/reference-cli-tools.md
index d4ba2c5..cc8b20a 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -437,6 +437,7 @@ Options
 |`-v`, `--encryption-key-value-file`|The file which contains the private key 
to decrypt payload||
 |`-h`, `--help`|Help message|false|
 |`--conf-file`|Configuration file||
+|`-m`, `--num-messages`|Number of messages to consume in total. If the value 
is equal to or smaller than 0, it keeps consuming messages.|0|
 |`-c`, `--max-connections`|Max number of TCP connections to a single 
broker|100|
 |`-n`, `--num-consumers`|Number of consumers (per topic)|1|
 |`-t`, `--num-topics`|The number of topics|1|
@@ -508,6 +509,7 @@ Options
 |`--listener-name`|Listener name for the broker||
 |`--conf-file`|Configuration file||
 |`-h`, `--help`|Help message|false|
+|`-n`, `--num-messages`|Number of messages to consume in total. If the value 
is equal to or smaller than 0, it keeps consuming messages.|0|
 |`-c`, `--max-connections`|Max number of TCP connections to a single 
broker|100|
 |`-t`, `--num-topics`|The number of topics|1|
 |`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|

Reply via email to