This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 636a749ace9cce7888c36764c493f2f39f494504
Author: Gjiangtao <[email protected]>
AuthorDate: Mon Dec 7 16:27:59 2020 +0800

    add pulsar-perf new feature: one subscription has more than one consumers 
(#8837)
    
    ### Motivation
    
    
    *The official performance test tool pulsar-perf now supports only one 
consumer per subscription, not multiple consumers per subscription.*
    
    ### Modifications
    
    - A new parameter `numSubscriptions` was added, which specifies the number 
of subscriptions per Topic
    - Change the definition of `numConsumers`: before: number of consumers per 
topic; After: the number of consumers per subscription
    - add new feature: one subscription has more than one consumers
    - add new parameter: `receiver-queue-size-across-partitions`, which means 
*Max total size of the receiver queue across partitions*
    
    (cherry picked from commit 78c8e3237f3e063cd61c1e22a0a56916e650851a)
---
 .../pulsar/testclient/PerformanceConsumer.java     | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 552b146..606262e 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -81,9 +81,12 @@ public class PerformanceConsumer {
         @Parameter(names = { "-t", "--num-topics" }, description = "Number of 
topics")
         public int numTopics = 1;
 
-        @Parameter(names = { "-n", "--num-consumers" }, description = "Number 
of consumers (per topic)")
+        @Parameter(names = { "-n", "--num-consumers" }, description = "Number 
of consumers (per subscription), only one consumer is allowed when 
subscriptionType is Exclusive")
         public int numConsumers = 1;
 
+        @Parameter(names = { "-ns", "--num-subscriptions" }, description = 
"Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
         @Parameter(names = { "-s", "--subscriber-name" }, description = 
"Subscriber name prefix")
         public String subscriberName = "sub";
 
@@ -99,6 +102,9 @@ public class PerformanceConsumer {
         @Parameter(names = { "-q", "--receiver-queue-size" }, description = 
"Size of the receiver queue")
         public int receiverQueueSize = 1000;
 
+        @Parameter(names = { "-p", "--receiver-queue-size-across-partitions" 
}, description = "Max total size of the receiver queue across partitions")
+        public int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+
         @Parameter(names = { "--replicated" }, description = "Whether the 
subscription status should be replicated")
         public boolean replicatedSubscription = false;
 
@@ -188,6 +194,12 @@ public class PerformanceConsumer {
             System.exit(-1);
         }
 
+        if (arguments.subscriptionType == SubscriptionType.Exclusive && 
arguments.numConsumers > 1) {
+            System.out.println("Only one consumer is allowed when 
subscriptionType is Exclusive");
+            jc.usage();
+            System.exit(-1);
+        }
+
         if (arguments.confFile != null) {
             Properties prop = new Properties(System.getProperties());
             prop.load(new FileInputStream(arguments.confFile));
@@ -306,6 +318,7 @@ public class PerformanceConsumer {
         ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
                 .messageListener(listener) //
                 .receiverQueueSize(arguments.receiverQueueSize) //
+                
.maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions)
                 
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, 
TimeUnit.MILLISECONDS) //
                 .subscriptionType(arguments.subscriptionType)
                 
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
@@ -328,18 +341,20 @@ public class PerformanceConsumer {
         for (int i = 0; i < arguments.numTopics; i++) {
             final TopicName topicName = (arguments.numTopics == 1) ? 
prefixTopicName
                     : TopicName.get(String.format("%s-%d", prefixTopicName, 
i));
-            log.info("Adding {} consumers on topic {}", 
arguments.numConsumers, topicName);
+            log.info("Adding {} consumers per subscription on topic {}", 
arguments.numConsumers, topicName);
 
-            for (int j = 0; j < arguments.numConsumers; j++) {
+            for (int j = 0; j < arguments.numSubscriptions; j++) {
                 String subscriberName;
-                if (arguments.numConsumers > 1) {
+                if (arguments.numSubscriptions > 1) {
                     subscriberName = String.format("%s-%d", 
arguments.subscriberName, j);
                 } else {
                     subscriberName = arguments.subscriberName;
                 }
 
-                
futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
-                        .subscribeAsync());
+                for (int k = 0; k < arguments.numConsumers; k++) {
+                    
futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
+                            .subscribeAsync());
+                }
             }
         }
 
@@ -347,7 +362,7 @@ public class PerformanceConsumer {
             future.get();
         }
 
-        log.info("Start receiving from {} consumers on {} topics", 
arguments.numConsumers,
+        log.info("Start receiving from {} consumers per subscription on {} 
topics", arguments.numConsumers,
                 arguments.numTopics);
 
         long start = System.nanoTime();

Reply via email to