This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f4a3f4a0126cfd863772fa2fed57c1ef0113563c Author: Fangbin Sun <[email protected]> AuthorDate: Wed Mar 3 09:06:11 2021 +0800 [Issue 8340] [pulsar-testclient] Fix to support to specify topics and subscriptions (#9716) ### Motivation Fixes #8340 ### Modifications - Fix option `main parameter` for CLI `pulsar-perf produce/consume/read` to specify multi topics. - Add option `--subscriptions` for CLI `pulsar-perf` consume to specify multi subscriptions. - Deprecate and hide the option `--subscriber-name`. - Modify the corresponding doc. (cherry picked from commit e137f265c672f36a7b04711c120b89cefcadf61b) --- .../pulsar/testclient/PerformanceConsumer.java | 37 ++++++++++++---------- .../pulsar/testclient/PerformanceProducer.java | 8 ++--- .../pulsar/testclient/PerformanceReader.java | 9 ++---- site2/docs/reference-cli-tools.md | 8 ++--- 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 0ee99a5..27bc102 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -22,10 +22,9 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.FileInputStream; -import java.nio.file.Paths; import java.text.DecimalFormat; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -84,9 +83,13 @@ public class PerformanceConsumer { @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)") public int numSubscriptions = 1; - @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix") + @Deprecated + @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true) public String subscriberName = "sub"; + @Parameter(names = { "-ss", "--subscriptions" }, description = "A list of subscriptions to consume on (e.g. sub1,sub2)") + public List<String> subscriptions = Collections.singletonList("sub"); + @Parameter(names = { "-st", "--subscription-type" }, description = "Subscription type") public SubscriptionType subscriptionType = SubscriptionType.Exclusive; @@ -105,7 +108,7 @@ public class PerformanceConsumer { @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated") public boolean replicatedSubscription = false; - @Parameter(names = { "--acks-delay-millis" }, description = "Acknowlegments grouping delay in millis") + @Parameter(names = { "--acks-delay-millis" }, description = "Acknowledgements grouping delay in millis") public int acknowledgmentsGroupingDelayMillis = 100; @Parameter(names = { "-c", @@ -185,8 +188,8 @@ public class PerformanceConsumer { System.exit(-1); } - if (arguments.topic.size() != 1) { - System.out.println("Only one topic name is allowed"); + if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) { + System.out.println("The size of topics list should be equal to --num-topics"); jc.usage(); System.exit(-1); } @@ -197,6 +200,14 @@ public class PerformanceConsumer { System.exit(-1); } + if (arguments.subscriptionType != SubscriptionType.Exclusive && + arguments.subscriptions != null && + arguments.subscriptions.size() != arguments.numConsumers) { + System.out.println("The size of subscriptions list should be equal to --num-consumers when subscriptionType isn't Exclusive"); + jc.usage(); + System.exit(-1); + } + if (arguments.confFile != null) { Properties prop = new Properties(System.getProperties()); prop.load(new FileInputStream(arguments.confFile)); @@ -237,8 +248,6 @@ public class PerformanceConsumer { ObjectWriter w = m.writerWithDefaultPrettyPrinter(); log.info("Starting Pulsar performance consumer with config: {}", w.writeValueAsString(arguments)); - final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0)); - final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; long startTime = System.nanoTime(); long testEndTime = startTime + (long) (arguments.testTime * 1e9); @@ -313,18 +322,12 @@ public class PerformanceConsumer { } for (int i = 0; i < arguments.numTopics; i++) { - final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName - : TopicName.get(String.format("%s-%d", prefixTopicName, i)); + final TopicName topicName = TopicName.get(arguments.topic.get(i)); + log.info("Adding {} consumers per subscription on topic {}", arguments.numConsumers, topicName); for (int j = 0; j < arguments.numSubscriptions; j++) { - String subscriberName; - if (arguments.numSubscriptions > 1) { - subscriberName = String.format("%s-%d", arguments.subscriberName, j); - } else { - subscriberName = arguments.subscriberName; - } - + String subscriberName = arguments.subscriptions.get(j); for (int k = 0; k < arguments.numConsumers; k++) { futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName) .subscribeAsync()); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 92dda6e..0c91fce 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -42,7 +42,6 @@ import java.nio.file.Paths; import java.text.DecimalFormat; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -234,8 +233,8 @@ public class PerformanceProducer { System.exit(-1); } - if (arguments.topics.size() != 1) { - System.out.println("Only one topic name is allowed"); + if (arguments.topics != null && arguments.topics.size() != arguments.numTopics) { + System.out.println("The size of topics list should be equal to --num-topic"); jc.usage(); System.exit(-1); } @@ -396,7 +395,6 @@ public class PerformanceProducer { PulsarClient client = null; try { // Now processing command line arguments - String prefixTopicName = arguments.topics.get(0); List<Future<Producer<byte[]>>> futures = Lists.newArrayList(); ClientBuilder clientBuilder = PulsarClient.builder() // @@ -449,7 +447,7 @@ public class PerformanceProducer { } for (int i = 0; i < arguments.numTopics; i++) { - String topic = (arguments.numTopics == 1) ? prefixTopicName : String.format("%s-%d", prefixTopicName, i); + String topic = arguments.topics.get(i); log.info("Adding {} publishers on topic {}", arguments.numProducers, topic); for (int j = 0; j < arguments.numProducers; j++) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index 0d21196..97a00fc 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -141,8 +141,8 @@ public class PerformanceReader { System.exit(-1); } - if (arguments.topic.size() != 1) { - System.out.println("Only one topic name is allowed"); + if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) { + System.out.println("The size of topics list should be equal to --num-topics"); jc.usage(); System.exit(-1); } @@ -191,8 +191,6 @@ public class PerformanceReader { ObjectWriter w = m.writerWithDefaultPrettyPrinter(); log.info("Starting Pulsar performance reader with config: {}", w.writeValueAsString(arguments)); - final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0)); - final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; long startTime = System.nanoTime(); long testEndTime = startTime + (long) (arguments.testTime * 1e9); @@ -251,8 +249,7 @@ public class PerformanceReader { .startMessageId(startMessageId); for (int i = 0; i < arguments.numTopics; i++) { - final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName - : TopicName.get(String.format("%s-%d", prefixTopicName, i)); + final TopicName topicName = TopicName.get(arguments.topic.get(i)); futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync()); } diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md index 0d7a6f9..09a1d8e 100644 --- a/site2/docs/reference-cli-tools.md +++ b/site2/docs/reference-cli-tools.md @@ -422,19 +422,19 @@ Options |`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.|| |`--auth_plugin`|Authentication plugin class name|| |`--listener-name`|Listener name for the broker|| -|`--acks-delay-millis`|Acknowlegments grouping delay in millis|100| +|`--acks-delay-millis`|Acknowledgements grouping delay in millis|100| |`-k`, `--encryption-key-name`|The private key name to decrypt payload|| |`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload|| |`-h`, `--help`|Help message|false| |`--conf-file`|Configuration file|| |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100| |`-n`, `--num-consumers`|Number of consumers (per topic)|1| -|`-t`, `--num-topic`|The number of topics|1| +|`-t`, `--num-topics`|The number of topics|1| |`-r`, `--rate`|Simulate a slow message consumer (rate in msg/s)|0| |`-q`, `--receiver-queue-size`|Size of the receiver queue|1000| |`-u`, `--service-url`|Pulsar service URL|| |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0| -|`-s`, `--subscriber-name`|Subscriber name prefix|sub| +|`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub| |`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive| |`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest| |`--trust-cert-file`|Path for the trusted TLS certificate file|| @@ -497,7 +497,7 @@ Options |`--conf-file`|Configuration file|| |`-h`, `--help`|Help message|false| |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100| -|`-t`, `--num-topic`|The number of topics|1| +|`-t`, `--num-topics`|The number of topics|1| |`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0| |`-q`, `--receiver-queue-size`|Size of the receiver queue|1000| |`-u`, `--service-url`|Pulsar service URL||
