This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0d8c7ef4f38 [fix][admin]allowAutoTopicCreation and
allowAutoTopicCreationType etc should be dynamically configured (#16023)
0d8c7ef4f38 is described below
commit 0d8c7ef4f3897113e158214aae0a89d4847a7dd3
Author: Radiance' 波波 <[email protected]>
AuthorDate: Mon Jul 4 09:24:39 2022 +0800
[fix][admin]allowAutoTopicCreation and allowAutoTopicCreationType etc
should be dynamically configured (#16023)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 10 ++
.../pulsar/broker/service/BrokerService.java | 30 ++++++
.../BrokerServiceAutoSubscriptionCreationTest.java | 18 ++++
.../BrokerServiceAutoTopicCreationTest.java | 101 +++++++++++++++++++++
.../broker/service/InactiveTopicDeleteTest.java | 60 ++++++++++++
site2/docs/reference-configuration.md | 19 ++--
6 files changed, 229 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 47f7bddfab1..fd138abfce6 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -544,12 +544,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_POLICIES,
+ dynamic = true,
doc = "Enable the deletion of inactive topics.\n"
+ "If only enable this option, will not clean the metadata of
partitioned topic."
)
private boolean brokerDeleteInactiveTopicsEnabled = true;
@FieldContext(
category = CATEGORY_POLICIES,
+ dynamic = true,
doc = "Metadata of inactive partitioned topic will not be
automatically cleaned up by default.\n"
+ "Note: If `allowAutoTopicCreation` and this option are enabled
at the same time,\n"
+ "it may appear that a partitioned topic has just been deleted
but is automatically created as a "
@@ -558,12 +560,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled =
false;
@FieldContext(
category = CATEGORY_POLICIES,
+ dynamic = true,
doc = "How often to check for inactive topics"
)
private int brokerDeleteInactiveTopicsFrequencySeconds = 60;
@FieldContext(
category = CATEGORY_POLICIES,
+ dynamic = true,
doc = "Set the inactive topic delete mode. Default is
delete_when_no_subscriptions\n"
+ "'delete_when_no_subscriptions' mode only delete the topic which has
no subscriptions and no active "
+ "producers\n"
@@ -575,6 +579,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_POLICIES,
+ dynamic = true,
doc = "Max duration of topic inactivity in seconds, default is not
present\n"
+ "If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will
be used\n"
+ "Topics that are inactive for longer than this value will be deleted"
@@ -1207,6 +1212,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
+ dynamic = true,
doc = "The number of partitions per partitioned topic.\n"
+ "If try to create or update partitioned topics by exceeded
number of partitions, then fail."
)
@@ -1779,21 +1785,25 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "Allow automated creation of topics if set to true (default
value)."
)
private boolean allowAutoTopicCreation = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "The type of topic that is allowed to be automatically
created.(partitioned/non-partitioned)"
)
private String allowAutoTopicCreationType = "non-partitioned";
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "Allow automated creation of subscriptions if set to true
(default value)."
)
private boolean allowAutoSubscriptionCreation = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "The number of partitioned topics that is allowed to be
automatically created"
+ "if allowAutoTopicCreationType is partitioned."
)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7f6c256a3e8..f56b0012b31 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2319,9 +2319,39 @@ public class BrokerService implements Closeable {
// add listener to notify topic subscriptionTypesEnabled changed.
registerConfigurationListener("subscriptionTypesEnabled",
this::updateBrokerSubscriptionTypesEnabled);
+ // add listener to notify partitioned topic defaultNumPartitions
changed
+ registerConfigurationListener("defaultNumPartitions",
defaultNumPartitions -> {
+ this.updateDefaultNumPartitions((int) defaultNumPartitions);
+ });
+
+ // add listener to notify partitioned topic
maxNumPartitionsPerPartitionedTopic changed
+ registerConfigurationListener("maxNumPartitionsPerPartitionedTopic",
maxNumPartitions -> {
+ this.updateMaxNumPartitionsPerPartitionedTopic((int)
maxNumPartitions);
+ });
+
// add more listeners here
}
+ private void updateDefaultNumPartitions(int numPartitions) {
+ int maxNumPartitions =
pulsar.getConfiguration().getMaxNumPartitionsPerPartitionedTopic();
+ if (maxNumPartitions == 0 || maxNumPartitions > numPartitions) {
+
this.pulsar.getConfiguration().setDefaultNumPartitions(numPartitions);
+ } else {
+
this.pulsar.getConfiguration().setDefaultNumPartitions(maxNumPartitions);
+ }
+ }
+
+ private void updateMaxNumPartitionsPerPartitionedTopic(int
maxNumPartitions) {
+ if (maxNumPartitions == 0) {
+
this.pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(maxNumPartitions);
+ return;
+ }
+ if (this.pulsar.getConfiguration().getDefaultNumPartitions() >
maxNumPartitions) {
+
this.pulsar.getConfiguration().setDefaultNumPartitions(maxNumPartitions);
+ }
+
this.pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(maxNumPartitions);
+ }
+
private void updateBrokerDispatchThrottlingMaxRate() {
if (brokerDispatchRateLimiter == null) {
brokerDispatchRateLimiter = new DispatchRateLimiter(this);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
index dc4b3d9d51d..05673613be7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
@@ -152,4 +154,20 @@ public class BrokerServiceAutoSubscriptionCreationTest
extends BrokerTestBase {
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
}
+ @Test
+ public void testDynamicConfigurationTopicAutoSubscriptionCreation()
+ throws PulsarAdminException, PulsarClientException {
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+
admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation",
"false");
+ String topicString = "persistent://prop/ns-abc/non-partitioned-topic"
+ UUID.randomUUID();
+ String subscriptionName = "non-partitioned-topic-sub";
+ admin.topics().createNonPartitionedTopic(topicString);
+ Assert.assertThrows(PulsarClientException.class,
+ ()->
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe());
+
admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation",
"true");
+
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
assertTrue(admin.topics().getSubscriptions(topicString).contains(subscriptionName));
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 44a52d4b85e..793013ea350 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -18,10 +18,14 @@
*/
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.UUID;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -30,6 +34,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.Assert;
@@ -420,4 +425,100 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
+
+ @Test
+ public void testDynamicConfigurationTopicAutoCreationDisable() throws
PulsarAdminException {
+ // test disable AllowAutoTopicCreation
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation",
"false");
+ final String namespaceName = "prop/ns-abc";
+ final String topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ + UUID.randomUUID();
+ Assert.assertThrows(PulsarClientException.NotFoundException.class,
+ ()-> pulsarClient.newProducer().topic(topic).create());
+ }
+
+ @Test
+ public void testDynamicConfigurationTopicAutoCreationNonPartitioned()
throws PulsarAdminException, PulsarClientException {
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ final String namespaceName = "prop/ns-abc";
+ final String topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ + UUID.randomUUID();
+ // test enable AllowAutoTopicCreation, non-partitioned
+ admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation",
"true");
+
admin.brokers().updateDynamicConfiguration("allowAutoTopicCreationType",
"non-partitioned");
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ List<String> topics = admin.topics().getList(namespaceName);
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName);
+ assertEquals(topics.size(), 1);
+ assertEquals(partitionedTopicList.size(), 0);
+ producer.close();
+ admin.topics().delete(topic);
+ }
+
+ @Test
+ public void testDynamicConfigurationTopicAutoCreationPartitioned() throws
PulsarAdminException, PulsarClientException {
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+ pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
+ final String namespaceName = "prop/ns-abc";
+ final String topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ + UUID.randomUUID();
+ // test enable AllowAutoTopicCreation, partitioned
+
admin.brokers().updateDynamicConfigurationAsync("allowAutoTopicCreation",
"true");
+
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic",
"6");
+
admin.brokers().updateDynamicConfiguration("allowAutoTopicCreationType",
"partitioned");
+ admin.brokers().updateDynamicConfiguration("defaultNumPartitions",
"4");
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ List<String> topics = admin.topics().getList(namespaceName);
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName);
+ PartitionedTopicMetadata partitionedTopicMetadata =
admin.topics().getPartitionedTopicMetadata(topic);
+ assertEquals(topics.size(), 4);
+ assertEquals(partitionedTopicList.size(), 1);
+ assertEquals(partitionedTopicMetadata.partitions, 4);
+ producer.close();
+ for (String t : topics) {
+ admin.topics().delete(t);
+ }
+ }
+
+ @Test
+ public void
testDynamicConfigurationTopicAutoCreationPartitionedWhenDefaultMoreThanMax()
throws PulsarAdminException, PulsarClientException {
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
+ final String namespaceName = "prop/ns-abc";
+ String topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ + UUID.randomUUID();
+ // test enable AllowAutoTopicCreation, partitioned when
maxNumPartitionsPerPartitionedTopic < defaultNumPartitions
+
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic",
"2");
+ admin.brokers().updateDynamicConfiguration("defaultNumPartitions",
"6");
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ List<String> topics = admin.topics().getList(namespaceName);
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName);
+ PartitionedTopicMetadata partitionedTopicMetadata =
admin.topics().getPartitionedTopicMetadata(topic);
+ assertEquals(topics.size(), 2);
+ assertEquals(partitionedTopicList.size(), 1);
+ assertEquals(partitionedTopicMetadata.partitions, 2);
+ producer.close();
+ for (String t : topics) {
+ admin.topics().delete(t);
+ }
+
+ // set maxNumPartitionsPerPartitionedTopic, make
maxNumPartitionsPerPartitionedTopic < defaultNumPartitions
+
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic",
"1");
+ topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ + UUID.randomUUID();
+ producer = pulsarClient.newProducer().topic(topic).create();
+ topics = admin.topics().getList(namespaceName);
+ assertEquals(topics.size(), 1);
+ producer.close();
+ for (String t : topics) {
+ admin.topics().delete(t);
+ }
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index e1ba3efd020..4a372c31b15 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.HashMap;
@@ -596,4 +597,63 @@ public class InactiveTopicDeleteTest extends
BrokerTestBase {
Assert.assertTrue(V1Partitions.contains(healthCheckTopicV1));
Assert.assertTrue(V2Partitions.contains(healthCheckTopicV2));
}
+
+ @Test
+ public void testDynamicConfigurationBrokerDeleteInactiveTopicsEnabled()
throws Exception {
+ conf.setBrokerDeleteInactiveTopicsEnabled(true);
+ super.baseSetup();
+
admin.brokers().updateDynamicConfiguration("brokerDeleteInactiveTopicsEnabled",
"false");
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
+ assertFalse(conf.isBrokerDeleteInactiveTopicsEnabled());
+ });
+ }
+
+ @Test
+ public void
testDynamicConfigurationBrokerDeleteInactiveTopicsFrequencySeconds() throws
Exception {
+ conf.setBrokerDeleteInactiveTopicsFrequencySeconds(30);
+ super.baseSetup();
+ admin.brokers()
+
.updateDynamicConfiguration("brokerDeleteInactiveTopicsFrequencySeconds", "60");
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
+ assertEquals(conf.getBrokerDeleteInactiveTopicsFrequencySeconds(),
60);
+ });
+ }
+
+ @Test
+ public void
testDynamicConfigurationBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()
throws Exception {
+ conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(30);
+ super.baseSetup();
+ admin.brokers()
+
.updateDynamicConfiguration("brokerDeleteInactiveTopicsMaxInactiveDurationSeconds",
"60");
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
+
assertEquals(conf.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
60);
+ });
+ }
+
+ @Test
+ public void testDynamicConfigurationBrokerDeleteInactiveTopicsMode()
throws Exception {
+ conf.setBrokerDeleteInactiveTopicsMode
(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ super.baseSetup();
+ String expect =
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up.toString();
+ admin.brokers()
+ .updateDynamicConfiguration("brokerDeleteInactiveTopicsMode",
+ expect);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
+ assertEquals(conf.getBrokerDeleteInactiveTopicsMode().toString(),
expect);
+ });
+ }
+
+ @Test
+ public void testBrokerDeleteInactivePartitionedTopicMetadataEnabled()
throws Exception {
+ conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false);
+ super.baseSetup();
+ admin.brokers()
+
.updateDynamicConfiguration("brokerDeleteInactivePartitionedTopicMetadataEnabled",
+ "true");
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
+
assertTrue(conf.isBrokerDeleteInactivePartitionedTopicMetadataEnabled());
+ });
+ }
+
+
}
diff --git a/site2/docs/reference-configuration.md
b/site2/docs/reference-configuration.md
index d8e3bf07aa8..ab20f653dde 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -185,14 +185,15 @@ Pulsar brokers are responsible for handling incoming
messages from producers, di
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have
reached the quota |60|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit.
Being less than 0 means no limitation. By default, it is -1. | -1 |
|backlogQuotaDefaultRetentionPolicy|The defaulted backlog quota retention
policy. By Default, it is `producer_request_hold`. <li>'producer_request_hold'
Policy which holds producer's send request until the resource becomes available
(or holding times out)</li> <li>'producer_exception' Policy which throws
`javax.jms.ResourceAllocationException` to the producer
</li><li>'consumer_backlog_eviction' Policy which evicts the oldest message
from the slowest consumer's backlog</li>|producer_requ [...]
-|allowAutoTopicCreation| Enable topic auto creation if a new producer or
consumer connected |true|
-|allowAutoTopicCreationType| The type of topic that is allowed to be
automatically created.(partitioned/non-partitioned) |non-partitioned|
-|allowAutoSubscriptionCreation| Enable subscription auto creation if a new
consumer connected |true|
-|defaultNumPartitions| The number of partitioned topics that is allowed to be
automatically created if `allowAutoTopicCreationType` is partitioned |1|
-|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics. If
topics are not consumed for some while, these inactive topics might be cleaned
up. Deleting inactive topics is enabled by default. The default period is 1
minute. |true|
-|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive
topics |60|
-| brokerDeleteInactiveTopicsMode | Set the mode to delete inactive topics.
<li> `delete_when_no_subscriptions`: delete the topic which has no
subscriptions or active producers. </li><li>
`delete_when_subscriptions_caught_up`: delete the topic whose subscriptions
have no backlogs and which has no active producers or consumers. </li>|
`delete_when_no_subscriptions` |
-| brokerDeleteInactiveTopicsMaxInactiveDurationSeconds | Set the maximum
duration for inactive topics. If it is not specified, the
`brokerDeleteInactiveTopicsFrequencySeconds` parameter is adopted. | N/A |
+|allowAutoTopicCreation| (Dynamic configuration) Enable topic auto creation if
a new producer or consumer connected. |true|
+|allowAutoTopicCreationType| (Dynamic configuration) The type of topic that is
allowed to be automatically created.(partitioned/non-partitioned)
|non-partitioned|
+|allowAutoSubscriptionCreation| (Dynamic configuration) Enable subscription
auto creation if a new consumer connected |true|
+|defaultNumPartitions| (Dynamic configuration) The number of partitioned
topics that is allowed to be automatically created if
`allowAutoTopicCreationType` is partitioned |1|
+|brokerDeleteInactiveTopicsEnabled| (Dynamic configuration) Enable the
deletion of inactive topics. If topics are not consumed for some while, these
inactive topics might be cleaned up. Deleting inactive topics is enabled by
default. The default period is 1 minute. |true|
+|brokerDeleteInactiveTopicsFrequencySeconds|(Dynamic configuration) How often
to check for inactive topics |60|
+| brokerDeleteInactiveTopicsMode | (Dynamic configuration) Set the mode to
delete inactive topics. <li> `delete_when_no_subscriptions`: delete the topic
which has no subscriptions or active producers. </li><li>
`delete_when_subscriptions_caught_up`: delete the topic whose subscriptions
have no backlogs and which has no active producers or consumers. </li>|
`delete_when_no_subscriptions` |
+| brokerDeleteInactiveTopicsMaxInactiveDurationSeconds | (Dynamic
configuration) Set the maximum duration for inactive topics. If it is not
specified, the `brokerDeleteInactiveTopicsFrequencySeconds` parameter is
adopted. | N/A |
+| brokerDeleteInactivePartitionedTopicMetadataEnabled | (Dynamic
configuration) Metadata of inactive partitioned topic will not be
automatically cleaned up by default. Note: If `allowAutoTopicCreation` and this
option are enabled at the same time,it may appear that a partitioned topic has
just been deleted but is automatically created as a non-partitioned topic. |
false |
|forceDeleteTenantAllowed| Enable you to delete a tenant forcefully. |false|
|forceDeleteNamespaceAllowed| Enable you to delete a namespace forcefully.
|false|
|messageExpiryCheckIntervalInMinutes| The frequency of proactively checking
and purging expired messages. |5|
@@ -205,7 +206,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated
backlog size is greater
|clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no
version information |true|
|statusFilePath| Path for the file used to determine the rotation status for
the broker when responding to service discovery health checks ||
|preferLaterVersions| If true, (and ModularLoadManagerImpl is being used), the
load manager will attempt to use only brokers running the latest software
version (to minimize impact to bundles) |false|
-|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned
topic. Use 0 or negative number to disable the check|0|
+|maxNumPartitionsPerPartitionedTopic| (Dynamic configuration) Max number of
partitions per partitioned topic. Use 0 or negative number to disable the
check|0|
| maxSubscriptionsPerTopic | Maximum number of subscriptions allowed to
subscribe to a topic. Once this limit reaches, the broker rejects new
subscriptions until the number of subscriptions decreases. When the value is
set to 0, the limit check is disabled. | 0 |
| maxProducersPerTopic | Maximum number of producers allowed to connect to a
topic. Once this limit reaches, the broker rejects new producers until the
number of connected producers decreases. When the value is set to 0, the limit
check is disabled. | 0 |
| maxConsumersPerTopic | Maximum number of consumers allowed to connect to a
topic. Once this limit reaches, the broker rejects new consumers until the
number of connected consumers decreases. When the value is set to 0, the limit
check is disabled. | 0 |