This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new e51bb7bc741 [fix] [Perf] PerformanceProducer do not produce expected
number of messages. (#19775)
e51bb7bc741 is described below
commit e51bb7bc7412ef48651609eb62449fdd1debed9d
Author: thetumbled <[email protected]>
AuthorDate: Tue Jun 13 08:19:28 2023 +0800
[fix] [Perf] PerformanceProducer do not produce expected number of
messages. (#19775)
Co-authored-by: tison <[email protected]>
---
.../org/apache/pulsar/testclient/PerformanceProducer.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 446382f4dff..93009ed0728 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -462,6 +462,7 @@ public class PerformanceProducer {
oldTime = now;
}
+ PerfClientUtils.exit(0);
}
static IMessageFormatter getMessageFormatter(String formatterClass) {
@@ -483,6 +484,7 @@ public class PerformanceProducer {
Random random,
CountDownLatch doneLatch) {
PulsarClient client = null;
+ boolean produceEnough = false;
try {
// Now processing command line arguments
List<Future<Producer<byte[]>>> futures = new ArrayList<>();
@@ -580,6 +582,9 @@ public class PerformanceProducer {
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new
Semaphore(arguments.numMessagesPerTransaction);
while (true) {
+ if (produceEnough) {
+ break;
+ }
for (Producer<byte[]> producer : producers) {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
@@ -587,7 +592,8 @@ public class PerformanceProducer {
+ "--------------", arguments.testTime);
doneLatch.countDown();
Thread.sleep(5000);
- PerfClientUtils.exit(0);
+ produceEnough = true;
+ break;
}
}
@@ -597,7 +603,8 @@ public class PerformanceProducer {
, numMessages);
doneLatch.countDown();
Thread.sleep(5000);
- PerfClientUtils.exit(0);
+ produceEnough = true;
+ break;
}
}
rateLimiter.acquire();
@@ -722,10 +729,12 @@ public class PerformanceProducer {
} catch (Throwable t) {
log.error("Got error", t);
} finally {
+ if (!produceEnough) {
+ doneLatch.countDown();
+ }
if (null != client) {
try {
client.close();
- PerfClientUtils.exit(-1);
} catch (PulsarClientException e) {
log.error("Failed to close test client", e);
}