This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new dd08831c5aa [fix][test] Fix PerformanceProducer send count error
(#21706)
dd08831c5aa is described below
commit dd08831c5aa7811a375e0baeed140173f7bd1b7f
Author: Jiwe Guo <[email protected]>
AuthorDate: Mon Jan 22 22:10:00 2024 +0800
[fix][test] Fix PerformanceProducer send count error (#21706)
---
.../java/org/apache/pulsar/testclient/PerformanceProducer.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 93009ed0728..f989d854c79 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -578,7 +578,7 @@ public class PerformanceProducer {
}
}
// Send messages on all topics/producers
- long totalSent = 0;
+ AtomicLong totalSent = new AtomicLong(0);
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new
Semaphore(arguments.numMessagesPerTransaction);
while (true) {
@@ -598,7 +598,7 @@ public class PerformanceProducer {
}
if (numMessages > 0) {
- if (totalSent++ >= numMessages) {
+ if (totalSent.get() >= numMessages) {
log.info("------------- DONE (reached the maximum
number: {} of production) --------------"
, numMessages);
doneLatch.countDown();
@@ -616,7 +616,7 @@ public class PerformanceProducer {
if (arguments.payloadFilename != null) {
if (messageFormatter != null) {
- payloadData =
messageFormatter.formatMessage(arguments.producerName, totalSent,
+ payloadData =
messageFormatter.formatMessage(arguments.producerName, totalSent.get(),
payloadByteList.get(random.nextInt(payloadByteList.size())));
} else {
payloadData =
payloadByteList.get(random.nextInt(payloadByteList.size()));
@@ -646,13 +646,13 @@ public class PerformanceProducer {
if (msgKeyMode == MessageKeyGenerationMode.random) {
messageBuilder.key(String.valueOf(random.nextInt()));
} else if (msgKeyMode ==
MessageKeyGenerationMode.autoIncrement) {
- messageBuilder.key(String.valueOf(totalSent));
+ messageBuilder.key(String.valueOf(totalSent.get()));
}
PulsarClient pulsarClient = client;
messageBuilder.sendAsync().thenRun(() -> {
bytesSent.add(payloadData.length);
messagesSent.increment();
-
+ totalSent.incrementAndGet();
totalMessagesSent.increment();
totalBytesSent.add(payloadData.length);