sijie commented on a change in pull request #9859:
URL: https://github.com/apache/pulsar/pull/9859#discussion_r593513476



##########
File path: 
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -140,6 +148,9 @@
         @Parameter(names = { "-p", "--max-outstanding-across-partitions" }, 
description = "Max number of outstanding messages across partitions")
         public int maxPendingMessagesAcrossPartitions = 
DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
 
+        @Parameter(names = { "-np", "--partitions" }, description = "Create 
partitioned topics with the given number of partitions, set 0 to not try to 
create the topic")
+        public int createTopicPartitions = 0;

Review comment:
       ```suggestion
           public Integer createTopicPartitions =  null;
   ```
   
   Please use boxed integer to indicate if it is going to create a partitioned 
topic or not.

##########
File path: 
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -325,6 +341,31 @@ public static void main(String[] args) throws Exception {
             printAggregatedStats();
         }));
 
+        if (arguments.createTopicPartitions > 0) {
+            PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
+                    .serviceHttpUrl(arguments.adminURL)
+                    .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
+            if (isNotBlank(arguments.authPluginClassName)) {
+                clientBuilder.authentication(arguments.authPluginClassName, 
arguments.authParams);
+            }
+
+            if (arguments.tlsAllowInsecureConnection != null) {
+                
clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+            }
+
+            try (PulsarAdmin client = clientBuilder.build();) {
+                for (String topic : arguments.topics) {
+                    log.info("Creating partitioned topic {} with {} 
partitions", topic, arguments.createTopicPartitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, 
arguments.createTopicPartitions);
+                    } catch (PulsarAdminException.ConflictException 
alreadyExists) {
+                        log.debug("Topic "+topic+" already exists: " + 
alreadyExists);

Review comment:
       `log.debug` should be put in `if (log.isDebugEnabled())`

##########
File path: 
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -140,6 +148,9 @@
         @Parameter(names = { "-p", "--max-outstanding-across-partitions" }, 
description = "Max number of outstanding messages across partitions")
         public int maxPendingMessagesAcrossPartitions = 
DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
 
+        @Parameter(names = { "-np", "--partitions" }, description = "Create 
partitioned topics with the given number of partitions, set 0 to not try to 
create the topic")
+        public int createTopicPartitions = 0;

Review comment:
       ```suggestion
           public int partitions = 0;
   ```

##########
File path: 
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -325,6 +343,37 @@ public static void main(String[] args) throws Exception {
             printAggregatedStats();
         }));
 
+        if (arguments.createTopicPartitions > 0) {
+            PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
+                    .serviceHttpUrl(arguments.adminURL)
+                    .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
+            if (isNotBlank(arguments.authPluginClassName)) {
+                clientBuilder.authentication(arguments.authPluginClassName, 
arguments.authParams);
+            }
+
+            if (arguments.tlsAllowInsecureConnection != null) {
+                
clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+            }
+
+            try (PulsarAdmin client = clientBuilder.build();) {
+                for (String topic : arguments.topics) {
+                    log.info("Creating partitioned topic {} with {} 
partitions", topic, arguments.createTopicPartitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, 
arguments.createTopicPartitions);
+                    } catch (PulsarAdminException.ConflictException 
alreadyExists) {
+                        log.debug("Topic "+topic+" already exists: " + 
alreadyExists);
+                        PartitionedTopicMetadata partitionedTopicMetadata = 
client.topics().getPartitionedTopicMetadata(topic);
+                        if (partitionedTopicMetadata.partitions != 
arguments.createTopicPartitions) {
+                            log.error("Topic {} already exists but it has a 
wrong number of partitions: {}, expecting {}",
+                                    topic, 
partitionedTopicMetadata.partitions, arguments.createTopicPartitions);
+                            System.exit(-1);

Review comment:
       why exit(-1) here? Why can we throw an exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to