This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 188d4f4942e [fix][tools] Only apply maxPendingMessagesAcrossPartitions
if it presents (#15283)
188d4f4942e is described below
commit 188d4f4942e549e101757a73aa8785f2f3a2dbd4
Author: lipenghui <[email protected]>
AuthorDate: Sun Apr 24 21:01:23 2022 +0800
[fix][tools] Only apply maxPendingMessagesAcrossPartitions if it presents
(#15283)
---
.../pulsar/testclient/PerformanceProducer.java | 4 +++-
.../pulsar/testclient/PerformanceProducerTest.java | 21 +++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index d297eb7d7bd..f18f4a84e13 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -597,10 +597,12 @@ public class PerformanceProducer {
.sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
.compressionType(arguments.compression) //
.maxPendingMessages(arguments.maxOutstanding) //
-
.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions)
.accessMode(arguments.producerAccessMode)
// enable round robin message routing if it is a
partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+ if (arguments.maxPendingMessagesAcrossPartitions > 0) {
+
producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions);
+ }
AtomicReference<Transaction> transactionAtomicReference;
if (arguments.isEnableTransaction) {
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 920a36bd56a..3adedc3ac60 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -190,4 +190,25 @@ public class PerformanceProducerTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter);
}
+ @Test
+ public void testMaxOutstanding() throws Exception {
+ String argString = "%s -r 10 -u %s -au %s -m 5 -o 10000";
+ String topic = testTopic + UUID.randomUUID().toString();
+ String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress());
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Key_Shared).subscribe();
+ new Thread(() -> {
+ try {
+ PerformanceProducer.main(args.split(" "));
+ } catch (Exception e) {
+ log.error("Failed to start perf producer");
+ }
+ }).start();
+ Awaitility.await()
+ .untilAsserted(() -> {
+ Message<byte[]> message = consumer.receive(3,
TimeUnit.SECONDS);
+ assertNotNull(message);
+ });
+ consumer.close();
+ }
}