This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5b5dd291d705925b513547e35650a7a230c40a25 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Tue Dec 12 10:47:37 2023 +0800 [fix][test] Fix PerformanceProducer send count error (#21706) --- .../java/org/apache/pulsar/testclient/PerformanceProducer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index f6d8dc4f3d1..3f75d38fb1e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -615,7 +615,7 @@ public class PerformanceProducer { } } // Send messages on all topics/producers - long totalSent = 0; + AtomicLong totalSent = new AtomicLong(0); AtomicLong numMessageSend = new AtomicLong(0); Semaphore numMsgPerTxnLimit = new Semaphore(arguments.numMessagesPerTransaction); while (true) { @@ -635,7 +635,7 @@ public class PerformanceProducer { } if (numMessages > 0) { - if (totalSent++ >= numMessages) { + if (totalSent.get() >= numMessages) { log.info("------------- DONE (reached the maximum number: {} of production) --------------" , numMessages); doneLatch.countDown(); @@ -653,7 +653,7 @@ public class PerformanceProducer { if (arguments.payloadFilename != null) { if (messageFormatter != null) { - payloadData = messageFormatter.formatMessage(arguments.producerName, totalSent, + payloadData = messageFormatter.formatMessage(arguments.producerName, totalSent.get(), payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size()))); } else { payloadData = payloadByteList.get( @@ -687,13 +687,13 @@ public class PerformanceProducer { if (msgKeyMode == MessageKeyGenerationMode.random) { messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt())); } else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) { - messageBuilder.key(String.valueOf(totalSent)); + messageBuilder.key(String.valueOf(totalSent.get())); } PulsarClient pulsarClient = client; messageBuilder.sendAsync().thenRun(() -> { bytesSent.add(payloadData.length); messagesSent.increment(); - + totalSent.incrementAndGet(); totalMessagesSent.increment(); totalBytesSent.add(payloadData.length);