This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9030e84 [Issue 6793][broker] Add config of
maxNumPartitionsPerPartitionedTopic (#6794)
9030e84 is described below
commit 9030e84fddf6c4764924c3ad6aca6b72d192c103
Author: Yuri Mizushima <[email protected]>
AuthorDate: Wed Apr 29 13:31:32 2020 +0900
[Issue 6793][broker] Add config of maxNumPartitionsPerPartitionedTopic
(#6794)
Fixes #6793
### Motivation
Now broker, can't set max number of partitions per partitioned topic. So we
can't prevent to create partitioned topic by many partitions from server side.
In this PR, introduce limit of max partitions to broker and to be able to
control it.
### Modifications
Add `maxNumPartitionsPerPartitionedTopic` config to broker and compare with
numPartitions at create or update partitioned topic.
If the config is set to `0`(is default), then disable the check.
---
conf/broker.conf | 4 ++++
conf/standalone.conf | 4 ++++
.../apache/pulsar/broker/ServiceConfiguration.java | 7 ++++++
.../apache/pulsar/broker/admin/AdminResource.java | 6 +++++
.../pulsar/broker/admin/impl/NamespacesBase.java | 4 ++++
.../broker/admin/impl/PersistentTopicsBase.java | 4 ++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 1 +
.../broker/admin/v1/NonPersistentTopics.java | 1 +
.../pulsar/broker/admin/v1/PersistentTopics.java | 2 ++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 1 +
.../broker/admin/v2/NonPersistentTopics.java | 1 +
.../pulsar/broker/admin/v2/PersistentTopics.java | 3 ++-
.../pulsar/broker/service/BrokerService.java | 4 +++-
.../apache/pulsar/broker/admin/AdminApiTest2.java | 26 +++++++++++++++++++++-
.../BrokerServiceAutoTopicCreationTest.java | 18 +++++++++++++++
site2/docs/reference-configuration.md | 2 ++
16 files changed, 85 insertions(+), 3 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index ef1cb89..2c5d6b9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -353,6 +353,10 @@ messagePublishBufferCheckIntervalInMillis=100
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120
+# Max number of partitions per partitioned topic
+# Use 0 or negative number to disable the check
+maxNumPartitionsPerPartitionedTopic=0
+
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request
with
#role as proxyRoles - it will demand to see a valid original principal.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 43138f8..0fd80a3 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -221,6 +221,10 @@ maxConsumersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0
+# Max number of partitions per partitioned topic
+# Use 0 or negative number to disable the check
+maxNumPartitionsPerPartitionedTopic=0
+
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request
with
#role as proxyRoles - it will demand to see a valid original principal.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fafeeb0..2b0b0e8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -645,6 +645,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int retentionCheckIntervalInSeconds = 120;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The number of partitions per partitioned topic.\n"
+ + "If try to create or update partitioned topics by exceeded
number of partitions, then fail."
+ )
+ private int maxNumPartitionsPerPartitionedTopic = 0;
+
/**** --- Messaging Protocols --- ****/
@FieldContext(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index cafb183..eedcd80 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -727,6 +727,7 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse,
int numPartitions) {
+ final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
@@ -738,11 +739,16 @@ public abstract class AdminResource extends
PulsarWebResource {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be more than 0"));
return;
}
+ if (maxPartitions > 0 && numPartitions > maxPartitions) {
+ asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions));
+ return;
+ }
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This
topic already exists"));
} else {
+
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b5835fa..bb584a7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -563,12 +563,16 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
AutoTopicCreationOverride autoTopicCreationOverride) {
+ final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if
(!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid
configuration for autoTopicCreationOverride");
}
+ if (maxPartitions > 0 &&
autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
+ throw new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be less than or equal to " + maxPartitions);
+ }
// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES,
namespaceName.toString())).thenApply(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0caadeb..b53e12e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -475,6 +475,10 @@ public class PersistentTopicsBase extends AdminResource {
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(),
numPartitions);
}
+ final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+ if (maxPartitions > 0 && numPartitions > maxPartitions) {
+ throw new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be less than or equal to " + maxPartitions);
+ }
if (topicName.isGlobal() &&
isNamespaceReplicated(topicName.getNamespaceObject())) {
Set<String> clusters =
getNamespaceReplicatedClusters(topicName.getNamespaceObject());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4a38299..957a23a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -400,6 +400,7 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting
for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation
override") })
public void setAutoTopicCreation(@Suspended final AsyncResponse
asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 2338b0f..dffd4a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -123,6 +123,7 @@ public class NonPersistentTopics extends PersistentTopics {
@Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
@ApiOperation(hidden = true, value = "Create a partitioned topic.", notes
= "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and less than or equal to
maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already
exist") })
public void createPartitionedTopic(@Suspended final AsyncResponse
asyncResponse, @PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 236473a..31367cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -147,6 +147,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and less than or equal to
maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already
exist") })
public void createPartitionedTopic(@Suspended final AsyncResponse
asyncResponse, @PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
@@ -180,6 +181,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and less than or equal to
maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not
exist") })
public void updatePartitionedTopic(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 96793d9..dd2c57e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -306,6 +306,7 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting
for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation
override") })
public void setAutoTopicCreation(
@Suspended final AsyncResponse asyncResponse,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 3756f82..103e4de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -158,6 +158,7 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 401, message = "Don't have permission to
manage resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "The tenant/namespace does not
exist"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and less than or equal to
maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already
exists"),
@ApiResponse(code = 412, message = "Failed Reason : Name is
invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9bf72dc..789b7ef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -189,6 +189,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and less than or equal to
maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already
exist"),
@ApiResponse(code = 412, message = "Failed Reason : Name is
invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
@@ -264,7 +265,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 401, message = "Don't have permission to
adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
- @ApiResponse(code = 406, message = "The number of partitions
should be more than 0"),
+ @ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and less than or equal to
maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not
exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is
invalid"),
@ApiResponse(code = 500, message = "Internal server error")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7ddb138..7f69792 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1894,8 +1894,10 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata>
createDefaultPartitionedTopicAsync(TopicName topicName) {
- int defaultNumPartitions =
pulsar.getBrokerService().getDefaultNumPartitions(topicName);
+ final int defaultNumPartitions =
pulsar.getBrokerService().getDefaultNumPartitions(topicName);
+ final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0, "Default number of partitions
should be more than 0");
+ checkArgument(maxPartitions <= 0 || defaultNumPartitions <=
maxPartitions, "Number of partitions should be less than or equal to " +
maxPartitions);
PartitionedTopicMetadata configMetadata = new
PartitionedTopicMetadata(defaultNumPartitions);
CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture =
futureWithDeadline();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 558ce39..1fad0f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -60,7 +60,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -1245,4 +1244,29 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
}
+ @Test
+ public void testMaxNumPartitionsPerPartitionedTopicSuccess() {
+ final String topic =
"persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-success";
+ pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(3);
+
+ try {
+ admin.topics().createPartitionedTopic(topic, 2);
+ } catch (Exception e) {
+ fail("should not throw any exceptions");
+ }
+ }
+
+ @Test
+ public void testMaxNumPartitionsPerPartitionedTopicFailure() {
+ final String topic =
"persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-failure";
+ pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);
+
+ try {
+ admin.topics().createPartitionedTopic(topic, 3);
+ fail("should throw exception when number of partitions exceed than
max partitions");
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarAdminException);
+ }
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 275ec3f..0421e29 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -322,4 +322,22 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
}
}
+
+ @Test
+ public void testMaxNumPartitionsPerPartitionedTopicTopicCreation() {
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setDefaultNumPartitions(3);
+ pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);
+
+ final String topicString =
"persistent://prop/ns-abc/partitioned-test-topic-11";
+ final String subscriptionName = "test-topic-sub-11";
+
+ try {
+
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+ fail("should throw exception when number of partitions exceed than
max partitions");
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarClientException);
+ }
+ }
}
diff --git a/site2/docs/reference-configuration.md
b/site2/docs/reference-configuration.md
index d0fe84f..2ce29d5 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -143,6 +143,7 @@ Pulsar brokers are responsible for handling incoming
messages from producers, di
|clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no
version information |true|
|statusFilePath| Path for the file used to determine the rotation status for
the broker when responding to service discovery health checks ||
|preferLaterVersions| If true, (and ModularLoadManagerImpl is being used), the
load manager will attempt to use only brokers running the latest software
version (to minimize impact to bundles) |false|
+|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned
topic. Use 0 or negative number to disable the check|0|
|tlsEnabled| Enable TLS |false|
|tlsCertificateFilePath| Path for the TLS certificate file ||
|tlsKeyFilePath| Path for the TLS private key file ||
@@ -355,6 +356,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client)
CLI tool can be used
|statusFilePath| The path for the file used to determine the rotation status
for the broker when responding to service discovery health checks
|/usr/local/apache/htdocs|
|maxUnackedMessagesPerConsumer| The maximum number of unacknowledged messages
allowed to be received by consumers on a shared subscription. The broker will
stop sending messages to a consumer once this limit is reached or until the
consumer begins acknowledging messages. A value of 0 disables the unacked
message limit check and thus allows consumers to receive messages without any
restrictions. |50000|
|maxUnackedMessagesPerSubscription| The same as above, except per subscription
rather than per consumer. |200000|
+|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned
topic. Use 0 or negative number to disable the check|0|
|authenticationEnabled| Enable authentication for the broker. |false|
|authenticationProviders| A comma-separated list of class names for
authentication providers. |false|
|authorizationEnabled| Enforce authorization in brokers. |false|