This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 25bfc4668ae [fix] [Perf] PerformanceProducer do not produce expected 
number of messages. (#19775)
25bfc4668ae is described below

commit 25bfc4668ae31effb784c4ca04443924ec5b27e1
Author: thetumbled <[email protected]>
AuthorDate: Tue Jun 13 08:19:28 2023 +0800

    [fix] [Perf] PerformanceProducer do not produce expected number of 
messages. (#19775)
    
    Co-authored-by: tison <[email protected]>
---
 .../org/apache/pulsar/testclient/PerformanceProducer.java | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 554d57972c9..762ad61b837 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -547,6 +547,7 @@ public class PerformanceProducer {
 
             oldTime = now;
         }
+        PerfClientUtils.exit(0);
     }
 
     static IMessageFormatter getMessageFormatter(String formatterClass) {
@@ -568,6 +569,7 @@ public class PerformanceProducer {
                                     Random random,
                                     CountDownLatch doneLatch) {
         PulsarClient client = null;
+        boolean produceEnough = false;
         try {
             // Now processing command line arguments
             List<Future<Producer<byte[]>>> futures = new ArrayList<>();
@@ -683,6 +685,9 @@ public class PerformanceProducer {
             AtomicLong numMessageSend = new AtomicLong(0);
             Semaphore numMsgPerTxnLimit = new 
Semaphore(arguments.numMessagesPerTransaction);
             while (true) {
+                if (produceEnough) {
+                    break;
+                }
                 for (Producer<byte[]> producer : producers) {
                     if (arguments.testTime > 0) {
                         if (System.nanoTime() > testEndTime) {
@@ -690,7 +695,8 @@ public class PerformanceProducer {
                                     + "--------------", arguments.testTime);
                             doneLatch.countDown();
                             Thread.sleep(5000);
-                            PerfClientUtils.exit(0);
+                            produceEnough = true;
+                            break;
                         }
                     }
 
@@ -700,7 +706,8 @@ public class PerformanceProducer {
                                     , numMessages);
                             doneLatch.countDown();
                             Thread.sleep(5000);
-                            PerfClientUtils.exit(0);
+                            produceEnough = true;
+                            break;
                         }
                     }
                     rateLimiter.acquire();
@@ -825,10 +832,12 @@ public class PerformanceProducer {
         } catch (Throwable t) {
             log.error("Got error", t);
         } finally {
+            if (!produceEnough) {
+                doneLatch.countDown();
+            }
             if (null != client) {
                 try {
                     client.close();
-                    PerfClientUtils.exit(-1);
                 } catch (PulsarClientException e) {
                     log.error("Failed to close test client", e);
                 }

Reply via email to