This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bdb620369b8ff419c5d27dbb36d530bef472c2fc Author: lipenghui <[email protected]> AuthorDate: Sun Apr 24 21:01:23 2022 +0800 [fix][tools] Only apply maxPendingMessagesAcrossPartitions if it presents (#15283) (cherry picked from commit 188d4f4942e549e101757a73aa8785f2f3a2dbd4) --- .../pulsar/testclient/PerformanceProducer.java | 4 +++- .../pulsar/testclient/PerformanceProducerTest.java | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index d297eb7d7bd..f18f4a84e13 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -597,10 +597,12 @@ public class PerformanceProducer { .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // .compressionType(arguments.compression) // .maxPendingMessages(arguments.maxOutstanding) // - .maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions) .accessMode(arguments.producerAccessMode) // enable round robin message routing if it is a partitioned topic .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + if (arguments.maxPendingMessagesAcrossPartitions > 0) { + producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); + } AtomicReference<Transaction> transactionAtomicReference; if (arguments.isEnableTransaction) { diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 50174ed4b70..99b615678da 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -190,4 +190,25 @@ public class PerformanceProducerTest extends MockedPulsarServiceBaseTest { Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter); } + @Test + public void testMaxOutstanding() throws Exception { + String argString = "%s -r 10 -u %s -au %s -m 5 -o 10000"; + String topic = testTopic + UUID.randomUUID().toString(); + String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress()); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub") + .subscriptionType(SubscriptionType.Key_Shared).subscribe(); + new Thread(() -> { + try { + PerformanceProducer.main(args.split(" ")); + } catch (Exception e) { + log.error("Failed to start perf producer"); + } + }).start(); + Awaitility.await() + .untilAsserted(() -> { + Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(message); + }); + consumer.close(); + } }
