This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 25bfc4668ae [fix] [Perf] PerformanceProducer do not produce expected
number of messages. (#19775)
25bfc4668ae is described below
commit 25bfc4668ae31effb784c4ca04443924ec5b27e1
Author: thetumbled <[email protected]>
AuthorDate: Tue Jun 13 08:19:28 2023 +0800
[fix] [Perf] PerformanceProducer do not produce expected number of
messages. (#19775)
Co-authored-by: tison <[email protected]>
---
.../org/apache/pulsar/testclient/PerformanceProducer.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 554d57972c9..762ad61b837 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -547,6 +547,7 @@ public class PerformanceProducer {
oldTime = now;
}
+ PerfClientUtils.exit(0);
}
static IMessageFormatter getMessageFormatter(String formatterClass) {
@@ -568,6 +569,7 @@ public class PerformanceProducer {
Random random,
CountDownLatch doneLatch) {
PulsarClient client = null;
+ boolean produceEnough = false;
try {
// Now processing command line arguments
List<Future<Producer<byte[]>>> futures = new ArrayList<>();
@@ -683,6 +685,9 @@ public class PerformanceProducer {
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new
Semaphore(arguments.numMessagesPerTransaction);
while (true) {
+ if (produceEnough) {
+ break;
+ }
for (Producer<byte[]> producer : producers) {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
@@ -690,7 +695,8 @@ public class PerformanceProducer {
+ "--------------", arguments.testTime);
doneLatch.countDown();
Thread.sleep(5000);
- PerfClientUtils.exit(0);
+ produceEnough = true;
+ break;
}
}
@@ -700,7 +706,8 @@ public class PerformanceProducer {
, numMessages);
doneLatch.countDown();
Thread.sleep(5000);
- PerfClientUtils.exit(0);
+ produceEnough = true;
+ break;
}
}
rateLimiter.acquire();
@@ -825,10 +832,12 @@ public class PerformanceProducer {
} catch (Throwable t) {
log.error("Got error", t);
} finally {
+ if (!produceEnough) {
+ doneLatch.countDown();
+ }
if (null != client) {
try {
client.close();
- PerfClientUtils.exit(-1);
} catch (PulsarClientException e) {
log.error("Failed to close test client", e);
}