This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e588122 [testclient] Add new option --num-messages for consumer and
reader (#12016)
e588122 is described below
commit e5881222c3c922db933afba2fb3a639d2c99583c
Author: Ruguo Yu <[email protected]>
AuthorDate: Tue Sep 14 14:10:43 2021 +0800
[testclient] Add new option --num-messages for consumer and reader (#12016)
---
.../apache/pulsar/proxy/socket/client/PerformanceClient.java | 2 +-
.../org/apache/pulsar/testclient/ManagedLedgerWriter.java | 11 +++++------
.../org/apache/pulsar/testclient/PerformanceConsumer.java | 11 ++++++++++-
.../org/apache/pulsar/testclient/PerformanceProducer.java | 4 ++--
.../java/org/apache/pulsar/testclient/PerformanceReader.java | 11 ++++++++++-
site2/docs/reference-cli-tools.md | 2 ++
6 files changed, 30 insertions(+), 11 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 3902d5f..e147d6d 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -248,7 +248,7 @@ public class PerformanceClient {
for (String topic : producersMap.keySet()) {
if (messages > 0) {
if (totalSent >= messages) {
- log.trace("------------------- DONE
-----------------------");
+ log.trace("------------- DONE (reached the
maximum number: [{}] of production) --------------", messages);
Thread.sleep(10000);
PerfClientUtils.exit(0);
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 1520c92..950406d 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -150,8 +150,6 @@ public class ManagedLedgerWriter {
PerfClientUtils.exit(-1);
}
- arguments.testTime = TimeUnit.SECONDS.toMillis(arguments.testTime);
-
// Dump config variables
PerfClientUtils.printJVMInformation(log);
ObjectMapper m = new ObjectMapper();
@@ -240,7 +238,8 @@ public class ManagedLedgerWriter {
// Acquire 1 sec worth of messages to have a slower ramp-up
rateLimiter.acquire((int) msgRate);
- final long startTime = System.currentTimeMillis();
+ final long startTime = System.nanoTime();
+ final long testEndTime = startTime + (long)
(arguments.testTime * 1e9);
final Semaphore semaphore = new
Semaphore(maxOutstandingForThisThread);
@@ -270,8 +269,8 @@ public class ManagedLedgerWriter {
while (true) {
for (int j = 0; j < nunManagedLedgersForThisThread;
j++) {
if (arguments.testTime > 0) {
- if (System.currentTimeMillis() - startTime >
arguments.testTime) {
- log.info("------------------- DONE
-----------------------");
+ if (System.nanoTime() > testEndTime) {
+ log.info("------------- DONE (reached the
maximum duration: [{} seconds] of production) --------------",
arguments.testTime);
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
@@ -281,7 +280,7 @@ public class ManagedLedgerWriter {
if (numMessagesForThisThread > 0) {
if (totalSent++ >= numMessagesForThisThread) {
- log.info("------------------- DONE
-----------------------");
+ log.info("------------- DONE (reached the
maximum number: [{}] of production) --------------", numMessagesForThisThread);
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 32bc8b3..49eb0c9 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -118,6 +118,10 @@ public class PerformanceConsumer {
@Parameter(names = { "--acks-delay-millis" }, description =
"Acknowledgements grouping delay in millis")
public int acknowledgmentsGroupingDelayMillis = 100;
+ @Parameter(names = {"-m",
+ "--num-messages"}, description = "Number of messages to
consume in total. If <= 0, it will keep consuming")
+ public long numMessages = 0;
+
@Parameter(names = { "-c",
"--max-connections" }, description = "Max number of TCP
connections to a single broker")
public int maxConnections = 100;
@@ -288,11 +292,16 @@ public class PerformanceConsumer {
MessageListener<ByteBuffer> listener = (consumer, msg) -> {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
- log.info("------------------- DONE
-----------------------");
+ log.info("------------- DONE (reached the maximum
duration: [{} seconds] of consumption) --------------", arguments.testTime);
printAggregatedStats();
PerfClientUtils.exit(0);
}
}
+ if (arguments.numMessages > 0 && totalMessagesReceived.sum() >=
arguments.numMessages) {
+ log.info("------------- DONE (reached the maximum number: [{}]
of consumption) --------------", arguments.numMessages);
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ }
messagesReceived.increment();
bytesReceived.add(msg.size());
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index f198a2d..379464c 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -601,7 +601,7 @@ public class PerformanceProducer {
for (Producer<byte[]> producer : producers) {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
- log.info("------------------- DONE
-----------------------");
+ log.info("------------- DONE (reached the maximum
duration: [{} seconds] of production) --------------", arguments.testTime);
printAggregatedStats();
doneLatch.countDown();
Thread.sleep(5000);
@@ -611,7 +611,7 @@ public class PerformanceProducer {
if (numMessages > 0) {
if (totalSent++ >= numMessages) {
- log.info("------------------- DONE
-----------------------");
+ log.info("------------- DONE (reached the maximum
number: {} of production) --------------", numMessages);
printAggregatedStats();
doneLatch.countDown();
Thread.sleep(5000);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index c4becf0..91d840e 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -85,6 +85,10 @@ public class PerformanceReader {
@Parameter(names = { "-q", "--receiver-queue-size" }, description =
"Size of the receiver queue")
public int receiverQueueSize = 1000;
+ @Parameter(names = {"-n",
+ "--num-messages"}, description = "Number of messages to
consume in total. If <= 0, it will keep consuming")
+ public long numMessages = 0;
+
@Parameter(names = { "-c",
"--max-connections" }, description = "Max number of TCP
connections to a single broker")
public int maxConnections = 100;
@@ -215,10 +219,15 @@ public class PerformanceReader {
ReaderListener<byte[]> listener = (reader, msg) -> {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
- log.info("------------------- DONE
-----------------------");
+ log.info("------------- DONE (reached the maximum
duration: [{} seconds] of consumption) --------------", arguments.testTime);
PerfClientUtils.exit(0);
}
}
+ if (arguments.numMessages > 0 && totalMessagesReceived.sum() >=
arguments.numMessages) {
+ log.info("------------- DONE (reached the maximum number: [{}]
of consumption) --------------", arguments.numMessages);
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ }
messagesReceived.increment();
bytesReceived.add(msg.getData().length);
diff --git a/site2/docs/reference-cli-tools.md
b/site2/docs/reference-cli-tools.md
index d4ba2c5..cc8b20a 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -437,6 +437,7 @@ Options
|`-v`, `--encryption-key-value-file`|The file which contains the private key
to decrypt payload||
|`-h`, `--help`|Help message|false|
|`--conf-file`|Configuration file||
+|`-m`, `--num-messages`|Number of messages to consume in total. If the value
is equal to or smaller than 0, it keeps consuming messages.|0|
|`-c`, `--max-connections`|Max number of TCP connections to a single
broker|100|
|`-n`, `--num-consumers`|Number of consumers (per topic)|1|
|`-t`, `--num-topics`|The number of topics|1|
@@ -508,6 +509,7 @@ Options
|`--listener-name`|Listener name for the broker||
|`--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
+|`-n`, `--num-messages`|Number of messages to consume in total. If the value
is equal to or smaller than 0, it keeps consuming messages.|0|
|`-c`, `--max-connections`|Max number of TCP connections to a single
broker|100|
|`-t`, `--num-topics`|The number of topics|1|
|`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|