This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b5dd291d705925b513547e35650a7a230c40a25
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Tue Dec 12 10:47:37 2023 +0800

    [fix][test] Fix PerformanceProducer send count error (#21706)
---
 .../java/org/apache/pulsar/testclient/PerformanceProducer.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index f6d8dc4f3d1..3f75d38fb1e 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -615,7 +615,7 @@ public class PerformanceProducer {
                 }
             }
             // Send messages on all topics/producers
-            long totalSent = 0;
+            AtomicLong totalSent = new AtomicLong(0);
             AtomicLong numMessageSend = new AtomicLong(0);
             Semaphore numMsgPerTxnLimit = new 
Semaphore(arguments.numMessagesPerTransaction);
             while (true) {
@@ -635,7 +635,7 @@ public class PerformanceProducer {
                     }
 
                     if (numMessages > 0) {
-                        if (totalSent++ >= numMessages) {
+                        if (totalSent.get() >= numMessages) {
                             log.info("------------- DONE (reached the maximum 
number: {} of production) --------------"
                                     , numMessages);
                             doneLatch.countDown();
@@ -653,7 +653,7 @@ public class PerformanceProducer {
 
                     if (arguments.payloadFilename != null) {
                         if (messageFormatter != null) {
-                            payloadData = 
messageFormatter.formatMessage(arguments.producerName, totalSent,
+                            payloadData = 
messageFormatter.formatMessage(arguments.producerName, totalSent.get(),
                                     
payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size())));
                         } else {
                             payloadData = payloadByteList.get(
@@ -687,13 +687,13 @@ public class PerformanceProducer {
                     if (msgKeyMode == MessageKeyGenerationMode.random) {
                         
messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
                     } else if (msgKeyMode == 
MessageKeyGenerationMode.autoIncrement) {
-                        messageBuilder.key(String.valueOf(totalSent));
+                        messageBuilder.key(String.valueOf(totalSent.get()));
                     }
                     PulsarClient pulsarClient = client;
                     messageBuilder.sendAsync().thenRun(() -> {
                         bytesSent.add(payloadData.length);
                         messagesSent.increment();
-
+                        totalSent.incrementAndGet();
                         totalMessagesSent.increment();
                         totalBytesSent.add(payloadData.length);
 

Reply via email to