This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9030e84  [Issue 6793][broker] Add config of 
maxNumPartitionsPerPartitionedTopic (#6794)
9030e84 is described below

commit 9030e84fddf6c4764924c3ad6aca6b72d192c103
Author: Yuri Mizushima <[email protected]>
AuthorDate: Wed Apr 29 13:31:32 2020 +0900

    [Issue 6793][broker] Add config of maxNumPartitionsPerPartitionedTopic 
(#6794)
    
    Fixes #6793
    
    ### Motivation
    Now broker, can't set max number of partitions per partitioned topic. So we 
can't prevent to create partitioned topic by many partitions from server side.
    In this PR, introduce limit of max partitions to broker and to be able to 
control it.
    
    ### Modifications
    Add `maxNumPartitionsPerPartitionedTopic` config to broker and compare with 
numPartitions at create or update partitioned topic.
    If the config is set to `0`(is default), then disable the check.
---
 conf/broker.conf                                   |  4 ++++
 conf/standalone.conf                               |  4 ++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++++++
 .../apache/pulsar/broker/admin/AdminResource.java  |  6 +++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 ++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  4 ++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  1 +
 .../broker/admin/v1/NonPersistentTopics.java       |  1 +
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  2 ++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  1 +
 .../broker/admin/v2/NonPersistentTopics.java       |  1 +
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  3 ++-
 .../pulsar/broker/service/BrokerService.java       |  4 +++-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 26 +++++++++++++++++++++-
 .../BrokerServiceAutoTopicCreationTest.java        | 18 +++++++++++++++
 site2/docs/reference-configuration.md              |  2 ++
 16 files changed, 85 insertions(+), 3 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index ef1cb89..2c5d6b9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -353,6 +353,10 @@ messagePublishBufferCheckIntervalInMillis=100
 # Use 0 or negative number to disable the check
 retentionCheckIntervalInSeconds=120
 
+# Max number of partitions per partitioned topic
+# Use 0 or negative number to disable the check
+maxNumPartitionsPerPartitionedTopic=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 43138f8..0fd80a3 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -221,6 +221,10 @@ maxConsumersPerTopic=0
 # Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
 maxConsumersPerSubscription=0
 
+# Max number of partitions per partitioned topic
+# Use 0 or negative number to disable the check
+maxNumPartitionsPerPartitionedTopic=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fafeeb0..2b0b0e8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -645,6 +645,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int retentionCheckIntervalInSeconds = 120;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "The number of partitions per partitioned topic.\n"
+                + "If try to create or update partitioned topics by exceeded 
number of partitions, then fail."
+    )
+    private int maxNumPartitionsPerPartitionedTopic = 0;
+
     /**** --- Messaging Protocols --- ****/
 
     @FieldContext(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index cafb183..eedcd80 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -727,6 +727,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, 
int numPartitions) {
+        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         try {
             validateAdminAccessForTenant(topicName.getTenant());
         } catch (Exception e) {
@@ -738,11 +739,16 @@ public abstract class AdminResource extends 
PulsarWebResource {
             asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, 
"Number of partitions should be more than 0"));
             return;
         }
+        if (maxPartitions > 0 && numPartitions > maxPartitions) {
+            asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, 
"Number of partitions should be less than or equal to " + maxPartitions));
+            return;
+        }
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This 
topic already exists"));
             } else {
+
                 try {
                     String path = ZkAdminPaths.partitionedTopicPath(topicName);
                     byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b5835fa..bb584a7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -563,12 +563,16 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, 
AutoTopicCreationOverride autoTopicCreationOverride) {
+        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
 
         if 
(!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid 
configuration for autoTopicCreationOverride");
         }
+        if (maxPartitions > 0 && 
autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be less than or equal to " + maxPartitions);
+        }
 
         // Force to read the data s.t. the watch to the cache content is setup.
         policiesCache().getWithStatAsync(path(POLICIES, 
namespaceName.toString())).thenApply(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0caadeb..b53e12e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -475,6 +475,10 @@ public class PersistentTopicsBase extends AdminResource {
         if (!updateLocalTopicOnly) {
             validatePartitionTopicUpdate(topicName.getLocalName(), 
numPartitions);
         }
+        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+        if (maxPartitions > 0 && numPartitions > maxPartitions) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be less than or equal to " + maxPartitions);
+        }
 
         if (topicName.isGlobal() && 
isNamespaceReplicated(topicName.getNamespaceObject())) {
             Set<String> clusters = 
getNamespaceReplicatedClusters(topicName.getNamespaceObject());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4a38299..957a23a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -400,6 +400,7 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Override broker's allowAutoTopicCreation setting 
for a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 400, message = "Invalid autoTopicCreation 
override") })
     public void setAutoTopicCreation(@Suspended final AsyncResponse 
asyncResponse,
                                      @PathParam("property") String property, 
@PathParam("cluster") String cluster,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 2338b0f..dffd4a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -123,6 +123,7 @@ public class NonPersistentTopics extends PersistentTopics {
     @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
     @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes 
= "It needs to be called before creating a producer on a partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
     public void createPartitionedTopic(@Suspended final AsyncResponse 
asyncResponse, @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 236473a..31367cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -147,6 +147,7 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
     public void createPartitionedTopic(@Suspended final AsyncResponse 
asyncResponse, @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
@@ -180,6 +181,7 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic does not 
exist") })
     public void updatePartitionedTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 96793d9..dd2c57e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -306,6 +306,7 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Override broker's allowAutoTopicCreation setting 
for a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 400, message = "Invalid autoTopicCreation 
override") })
     public void setAutoTopicCreation(
             @Suspended final AsyncResponse asyncResponse,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 3756f82..103e4de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -158,6 +158,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 401, message = "Don't have permission to 
manage resources on this tenant"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "The tenant/namespace does not 
exist"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exists"),
             @ApiResponse(code = 412, message = "Failed Reason : Name is 
invalid or Namespace does not have any clusters configured"),
             @ApiResponse(code = 500, message = "Internal server error"),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9bf72dc..789b7ef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -189,6 +189,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist"),
             @ApiResponse(code = 412, message = "Failed Reason : Name is 
invalid or Namespace does not have any clusters configured"),
             @ApiResponse(code = 500, message = "Internal server error"),
@@ -264,7 +265,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 401, message = "Don't have permission to 
adminisActions to be grantedtrate resources on this tenant"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
-            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0"),
+            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic does not 
exist"),
             @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
             @ApiResponse(code = 500, message = "Internal server error")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7ddb138..7f69792 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1894,8 +1894,10 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     @SuppressWarnings("deprecation")
     private CompletableFuture<PartitionedTopicMetadata> 
createDefaultPartitionedTopicAsync(TopicName topicName) {
-        int defaultNumPartitions = 
pulsar.getBrokerService().getDefaultNumPartitions(topicName);
+        final int defaultNumPartitions = 
pulsar.getBrokerService().getDefaultNumPartitions(topicName);
+        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         checkArgument(defaultNumPartitions > 0, "Default number of partitions 
should be more than 0");
+        checkArgument(maxPartitions <= 0 || defaultNumPartitions <= 
maxPartitions, "Number of partitions should be less than or equal to " + 
maxPartitions);
 
         PartitionedTopicMetadata configMetadata = new 
PartitionedTopicMetadata(defaultNumPartitions);
         CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = 
futureWithDeadline();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 558ce39..1fad0f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -60,7 +60,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -1245,4 +1244,29 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
     }
 
+    @Test
+    public void testMaxNumPartitionsPerPartitionedTopicSuccess() {
+        final String topic = 
"persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-success";
+        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(3);
+
+        try {
+            admin.topics().createPartitionedTopic(topic, 2);
+        } catch (Exception e) {
+            fail("should not throw any exceptions");
+        }
+    }
+
+    @Test
+    public void testMaxNumPartitionsPerPartitionedTopicFailure() {
+        final String topic = 
"persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-failure";
+        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);
+
+        try {
+            admin.topics().createPartitionedTopic(topic, 3);
+            fail("should throw exception when number of partitions exceed than 
max partitions");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarAdminException);
+        }
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 275ec3f..0421e29 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -322,4 +322,22 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         }
 
     }
+
+    @Test
+    public void testMaxNumPartitionsPerPartitionedTopicTopicCreation() {
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(3);
+        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);
+
+        final String topicString = 
"persistent://prop/ns-abc/partitioned-test-topic-11";
+        final String subscriptionName = "test-topic-sub-11";
+
+        try {
+            
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+            fail("should throw exception when number of partitions exceed than 
max partitions");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException);
+        }
+    }
 }
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index d0fe84f..2ce29d5 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -143,6 +143,7 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 |clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no 
version information  |true|
 |statusFilePath|  Path for the file used to determine the rotation status for 
the broker when responding to service discovery health checks ||
 |preferLaterVersions| If true, (and ModularLoadManagerImpl is being used), the 
load manager will attempt to use only brokers running the latest software 
version (to minimize impact to bundles)  |false|
+|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned 
topic. Use 0 or negative number to disable the check|0|
 |tlsEnabled|  Enable TLS  |false|
 |tlsCertificateFilePath|  Path for the TLS certificate file ||
 |tlsKeyFilePath|  Path for the TLS private key file ||
@@ -355,6 +356,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) 
CLI tool can be used
 |statusFilePath|  The path for the file used to determine the rotation status 
for the broker when responding to service discovery health checks 
|/usr/local/apache/htdocs|
 |maxUnackedMessagesPerConsumer| The maximum number of unacknowledged messages 
allowed to be received by consumers on a shared subscription. The broker will 
stop sending messages to a consumer once this limit is reached or until the 
consumer begins acknowledging messages. A value of 0 disables the unacked 
message limit check and thus allows consumers to receive messages without any 
restrictions. |50000|
 |maxUnackedMessagesPerSubscription| The same as above, except per subscription 
rather than per consumer.  |200000|
+|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned 
topic. Use 0 or negative number to disable the check|0|
 |authenticationEnabled| Enable authentication for the broker. |false|
 |authenticationProviders| A comma-separated list of class names for 
authentication providers. |false|
 |authorizationEnabled|  Enforce authorization in brokers. |false|

Reply via email to