This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new e51bb7bc741 [fix] [Perf] PerformanceProducer do not produce expected 
number of messages. (#19775)
e51bb7bc741 is described below

commit e51bb7bc7412ef48651609eb62449fdd1debed9d
Author: thetumbled <[email protected]>
AuthorDate: Tue Jun 13 08:19:28 2023 +0800

    [fix] [Perf] PerformanceProducer do not produce expected number of 
messages. (#19775)
    
    Co-authored-by: tison <[email protected]>
---
 .../org/apache/pulsar/testclient/PerformanceProducer.java | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 446382f4dff..93009ed0728 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -462,6 +462,7 @@ public class PerformanceProducer {
 
             oldTime = now;
         }
+        PerfClientUtils.exit(0);
     }
 
     static IMessageFormatter getMessageFormatter(String formatterClass) {
@@ -483,6 +484,7 @@ public class PerformanceProducer {
                                     Random random,
                                     CountDownLatch doneLatch) {
         PulsarClient client = null;
+        boolean produceEnough = false;
         try {
             // Now processing command line arguments
             List<Future<Producer<byte[]>>> futures = new ArrayList<>();
@@ -580,6 +582,9 @@ public class PerformanceProducer {
             AtomicLong numMessageSend = new AtomicLong(0);
             Semaphore numMsgPerTxnLimit = new 
Semaphore(arguments.numMessagesPerTransaction);
             while (true) {
+                if (produceEnough) {
+                    break;
+                }
                 for (Producer<byte[]> producer : producers) {
                     if (arguments.testTime > 0) {
                         if (System.nanoTime() > testEndTime) {
@@ -587,7 +592,8 @@ public class PerformanceProducer {
                                     + "--------------", arguments.testTime);
                             doneLatch.countDown();
                             Thread.sleep(5000);
-                            PerfClientUtils.exit(0);
+                            produceEnough = true;
+                            break;
                         }
                     }
 
@@ -597,7 +603,8 @@ public class PerformanceProducer {
                                     , numMessages);
                             doneLatch.countDown();
                             Thread.sleep(5000);
-                            PerfClientUtils.exit(0);
+                            produceEnough = true;
+                            break;
                         }
                     }
                     rateLimiter.acquire();
@@ -722,10 +729,12 @@ public class PerformanceProducer {
         } catch (Throwable t) {
             log.error("Got error", t);
         } finally {
+            if (!produceEnough) {
+                doneLatch.countDown();
+            }
             if (null != client) {
                 try {
                     client.close();
-                    PerfClientUtils.exit(-1);
                 } catch (PulsarClientException e) {
                     log.error("Failed to close test client", e);
                 }

Reply via email to