This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 636a749ace9cce7888c36764c493f2f39f494504 Author: Gjiangtao <[email protected]> AuthorDate: Mon Dec 7 16:27:59 2020 +0800 add pulsar-perf new feature: one subscription has more than one consumers (#8837) ### Motivation *The official performance test tool pulsar-perf now supports only one consumer per subscription, not multiple consumers per subscription.* ### Modifications - A new parameter `numSubscriptions` was added, which specifies the number of subscriptions per Topic - Change the definition of `numConsumers`: before: number of consumers per topic; After: the number of consumers per subscription - add new feature: one subscription has more than one consumers - add new parameter: `receiver-queue-size-across-partitions`, which means *Max total size of the receiver queue across partitions* (cherry picked from commit 78c8e3237f3e063cd61c1e22a0a56916e650851a) --- .../pulsar/testclient/PerformanceConsumer.java | 29 ++++++++++++++++------ 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 552b146..606262e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -81,9 +81,12 @@ public class PerformanceConsumer { @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics") public int numTopics = 1; - @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per topic)") + @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive") public int numConsumers = 1; + @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)") + public int numSubscriptions = 1; + @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix") public String subscriberName = "sub"; @@ -99,6 +102,9 @@ public class PerformanceConsumer { @Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue") public int receiverQueueSize = 1000; + @Parameter(names = { "-p", "--receiver-queue-size-across-partitions" }, description = "Max total size of the receiver queue across partitions") + public int maxTotalReceiverQueueSizeAcrossPartitions = 50000; + @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated") public boolean replicatedSubscription = false; @@ -188,6 +194,12 @@ public class PerformanceConsumer { System.exit(-1); } + if (arguments.subscriptionType == SubscriptionType.Exclusive && arguments.numConsumers > 1) { + System.out.println("Only one consumer is allowed when subscriptionType is Exclusive"); + jc.usage(); + System.exit(-1); + } + if (arguments.confFile != null) { Properties prop = new Properties(System.getProperties()); prop.load(new FileInputStream(arguments.confFile)); @@ -306,6 +318,7 @@ public class PerformanceConsumer { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() // .messageListener(listener) // .receiverQueueSize(arguments.receiverQueueSize) // + .maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions) .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // .subscriptionType(arguments.subscriptionType) .subscriptionInitialPosition(arguments.subscriptionInitialPosition) @@ -328,18 +341,20 @@ public class PerformanceConsumer { for (int i = 0; i < arguments.numTopics; i++) { final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName : TopicName.get(String.format("%s-%d", prefixTopicName, i)); - log.info("Adding {} consumers on topic {}", arguments.numConsumers, topicName); + log.info("Adding {} consumers per subscription on topic {}", arguments.numConsumers, topicName); - for (int j = 0; j < arguments.numConsumers; j++) { + for (int j = 0; j < arguments.numSubscriptions; j++) { String subscriberName; - if (arguments.numConsumers > 1) { + if (arguments.numSubscriptions > 1) { subscriberName = String.format("%s-%d", arguments.subscriberName, j); } else { subscriberName = arguments.subscriberName; } - futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName) - .subscribeAsync()); + for (int k = 0; k < arguments.numConsumers; k++) { + futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName) + .subscribeAsync()); + } } } @@ -347,7 +362,7 @@ public class PerformanceConsumer { future.get(); } - log.info("Start receiving from {} consumers on {} topics", arguments.numConsumers, + log.info("Start receiving from {} consumers per subscription on {} topics", arguments.numConsumers, arguments.numTopics); long start = System.nanoTime();
