This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new be5c038 Support configure max subscriptions per topic on the
namespace level policy (#8924)
be5c038 is described below
commit be5c038e8884216a3f16cb982c32c17beb1cb0b6
Author: feynmanlin <[email protected]>
AuthorDate: Sat Dec 12 13:10:49 2020 +0800
Support configure max subscriptions per topic on the namespace level policy
(#8924)
Master Issue: #8866
Currently, #8289 introduced max subscriptions per topic at the broker level
but does not support overwrite on the namespace level
Add api for namespace-level policy
AdminApiTest2.java
1) Verify that the basic API is correct
2) Verify that the restriction is in effect
3) Verify the priority of namespace level and broker level
(cherry picked from commit b63e288e09b12b293446ce46573ac6bcb8249e87)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 15 +++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 38 +++++++
.../pulsar/broker/service/AbstractTopic.java | 2 +
.../broker/service/persistent/PersistentTopic.java | 12 +-
.../apache/pulsar/broker/admin/AdminApiTest2.java | 124 ++++++++++++++++++++-
.../org/apache/pulsar/client/admin/Namespaces.java | 50 +++++++++
.../client/admin/internal/NamespacesImpl.java | 80 +++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 7 ++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 44 ++++++++
.../pulsar/common/policies/data/Policies.java | 2 +
.../pulsar/common/policies/data/PolicyName.java | 1 +
11 files changed, 371 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4ce489c..df18b53 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2685,6 +2685,21 @@ public abstract class NamespacesBase extends
AdminResource {
return
getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
}
+ protected Integer internalGetMaxSubscriptionsPerTopic() {
+ validateNamespacePolicyOperation(namespaceName,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ);
+ return getNamespacePolicies(namespaceName).max_subscriptions_per_topic;
+ }
+
+ protected void internalSetMaxSubscriptionsPerTopic(Integer
maxSubscriptionsPerTopic){
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+ if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "maxSubscriptionsPerTopic must be 0 or more");
+ }
+ internalSetPolicies("max_subscriptions_per_topic",
maxSubscriptionsPerTopic);
+ }
+
protected void internalSetMaxUnackedMessagesPerSubscription(int
maxUnackedMessagesPerSubscription) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 9b9530e..6befe4d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1057,6 +1057,44 @@ public class Namespaces extends NamespacesBase {
internalSetMaxUnackedMessagesPerSubscription(maxUnackedMessagesPerSubscription);
}
+ @GET
+ @Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic")
+ @ApiOperation(value = "Get maxSubscriptionsPerTopic config on a
namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist") })
+ public Integer getMaxSubscriptionsPerTopic(@PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetMaxSubscriptionsPerTopic();
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic")
+ @ApiOperation(value = " Set maxSubscriptionsPerTopic configuration on a
namespace.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message =
"maxUnackedMessagesPerSubscription value is not valid")})
+ public void setMaxSubscriptionsPerTopic(
+ @PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
+ @ApiParam(value = "Number of maximum subscriptions per topic",
required = true)
+ int maxSubscriptionsPerTopic) {
+ validateNamespaceName(tenant, namespace);
+ internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/maxSubscriptionsPerTopic")
+ @ApiOperation(value = "Remove maxSubscriptionsPerTopic configuration on a
namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void removeMaxSubscriptionsPerTopic(@PathParam("tenant") String
tenant,
+ @PathParam("namespace")
String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetMaxSubscriptionsPerTopic(null);
+ }
+
@POST
@Path("/{tenant}/{namespace}/antiAffinity")
@ApiOperation(value = "Set anti-affinity group for a namespace")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index dde9ad6..429248a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -92,6 +92,8 @@ public abstract class AbstractTopic implements Topic {
protected volatile int maxUnackedMessagesOnConsumer = -1;
+ protected volatile Integer maxSubscriptionsPerTopic = null;
+
protected volatile PublishRateLimiter topicPublishRateLimiter;
protected boolean preciseTopicPublishRateLimitingEnable;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 02fdf74..3de20c5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1982,6 +1982,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(data);
maxUnackedMessagesOnSubscription =
unackedMessagesExceededOnSubscription(data);
+ maxSubscriptionsPerTopic = data.max_subscriptions_per_topic;
if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis =
data.delayed_delivery_policies.getTickTime();
@@ -2671,9 +2672,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
private boolean checkMaxSubscriptionsPerTopicExceed() {
- final int maxSubscriptionsPerTopic =
brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
- if (maxSubscriptionsPerTopic > 0) {
- if (subscriptions != null && subscriptions.size() >=
maxSubscriptionsPerTopic) {
+ Integer maxSubsPerTopic = maxSubscriptionsPerTopic;
+
+ if (maxSubsPerTopic == null) {
+ maxSubsPerTopic =
brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
+ }
+
+ if (maxSubsPerTopic > 0) {
+ if (subscriptions != null && subscriptions.size() >=
maxSubsPerTopic) {
return true;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index c532c98..dd4785f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -32,7 +32,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
+import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -87,6 +87,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -1427,4 +1428,125 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
consumer2.close();
admin.topics().deletePartitionedTopic(topic);
}
+
+ @Test(timeOut = 30000)
+ public void testMaxSubPerTopicApi() throws Exception {
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
+
assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));
+
+ admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,100);
+
assertEquals(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace).intValue(),100);
+ admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
+
assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));
+
+
admin.namespaces().setMaxSubscriptionsPerTopicAsync(myNamespace,200).get();
+
assertEquals(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get().intValue(),200);
+ admin.namespaces().removeMaxSubscriptionsPerTopicAsync(myNamespace);
+
assertNull(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get());
+
+ try {
+ admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,-100);
+ fail("should fail");
+ } catch (PulsarAdminException ignore) {
+ }
+ }
+
+ @Test(timeOut = 30000)
+ public void testMaxSubPerTopic() throws Exception {
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ final String topic = "persistent://" + myNamespace +
"/testMaxSubPerTopic";
+ pulsarClient.newProducer().topic(topic).create().close();
+ final int maxSub = 2;
+ admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+ Field field =
PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
+ field.setAccessible(true);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int)
field.get(persistentTopic) == maxSub);
+
+ List<Consumer<?>> consumerList = new ArrayList<>(maxSub);
+ for (int i = 0; i < maxSub; i++) {
+ Consumer<?> consumer =
+
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
+ consumerList.add(consumer);
+ }
+ //Create a client that can fail quickly
+ try (PulsarClient client =
PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
+ .serviceUrl(brokerUrl.toString()).build()){
+
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
+ fail("should fail");
+ } catch (Exception ignore) {
+ }
+ //After removing the restriction, it should be able to create normally
+ admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
field.get(persistentTopic) == null);
+ Consumer<?> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
+ .subscribe();
+ consumerList.add(consumer);
+
+ for (Consumer<?> c : consumerList) {
+ c.close();
+ }
+ }
+
+ @Test(timeOut = 30000)
+ public void testMaxSubPerTopicPriority() throws Exception {
+ final int brokerLevelMaxSub = 2;
+ super.internalCleanup();
+ mockPulsarSetup.cleanup();
+ conf.setMaxSubscriptionsPerTopic(brokerLevelMaxSub);
+ super.internalSetup();
+
+ admin.clusters().createCluster("test", new
ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("prop-xyz", tenantInfo);
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ final String topic = "persistent://" + myNamespace +
"/testMaxSubPerTopic";
+ //Create a client that can fail quickly
+ PulsarClient client =
PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
+ .serviceUrl(brokerUrl.toString()).build();
+ //We can only create 2 consumers
+ List<Consumer<?>> consumerList = new ArrayList<>(brokerLevelMaxSub);
+ for (int i = 0; i < brokerLevelMaxSub; i++) {
+ Consumer<?> consumer =
+
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
+ consumerList.add(consumer);
+ }
+ try {
+
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
+ fail("should fail");
+ } catch (Exception ignore) {
+
+ }
+ //Set namespace-level policy,the limit should up to 4
+ final int nsLevelMaxSub = 4;
+ admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,
nsLevelMaxSub);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+ Field field =
PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
+ field.setAccessible(true);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int)
field.get(persistentTopic) == nsLevelMaxSub);
+ Consumer<?> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
+ .subscribe();
+ consumerList.add(consumer);
+ assertEquals(consumerList.size(), 3);
+ //After removing the restriction, it should fail again
+ admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
field.get(persistentTopic) == null);
+ try {
+
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
+ fail("should fail");
+ } catch (Exception ignore) {
+
+ }
+
+ for (Consumer<?> c : consumerList) {
+ c.close();
+ }
+ client.close();
+ }
+
+
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 48a11cf..a4637c4 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2454,6 +2454,56 @@ public interface Namespaces {
CompletableFuture<Void> removeDeduplicationSnapshotIntervalAsync(String
namespace);
/**
+ * Get the maxSubscriptionsPerTopic for a namespace.
+ *
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ Integer getMaxSubscriptionsPerTopic(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Get the maxSubscriptionsPerTopic for a namespace asynchronously.
+ *
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String
namespace);
+
+ /**
+ * Set the maxSubscriptionsPerTopic for a namespace.
+ *
+ * @param namespace
+ * @param maxSubscriptionsPerTopic
+ * @throws PulsarAdminException
+ */
+ void setMaxSubscriptionsPerTopic(String namespace, int
maxSubscriptionsPerTopic) throws PulsarAdminException;
+
+ /**
+ * Set the maxSubscriptionsPerTopic for a namespace asynchronously.
+ *
+ * @param namespace
+ * @param maxSubscriptionsPerTopic
+ * @return
+ */
+ CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String namespace,
int maxSubscriptionsPerTopic);
+
+ /**
+ * Remove the maxSubscriptionsPerTopic for a namespace.
+ *
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removeMaxSubscriptionsPerTopic(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Remove the maxSubscriptionsPerTopic for a namespace asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String
namespace);
+
+ /**
* Get the maxProducersPerTopic for a namespace.
* <p/>
* Response example:
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 8f59a56..b8e1a67 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2041,6 +2041,86 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public Integer getMaxSubscriptionsPerTopic(String namespace) throws
PulsarAdminException {
+ try {
+ return getMaxSubscriptionsPerTopicAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer maxSubscriptionsPerTopic) {
+ future.complete(maxSubscriptionsPerTopic);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setMaxSubscriptionsPerTopic(String namespace, int
maxSubscriptionsPerTopic)
+ throws PulsarAdminException {
+ try {
+ setMaxSubscriptionsPerTopicAsync(namespace,
maxSubscriptionsPerTopic).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String
namespace, int maxSubscriptionsPerTopic) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic");
+ return asyncPostRequest(path, Entity.entity(maxSubscriptionsPerTopic,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeMaxSubscriptionsPerTopic(String namespace) throws
PulsarAdminException {
+ try {
+ removeMaxSubscriptionsPerTopicAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public int getMaxProducersPerTopic(String namespace) throws
PulsarAdminException {
try {
return getMaxProducersPerTopicAsync(namespace).
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 927ee73..dac618d 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -355,6 +355,13 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-persistence myprop/clust/ns1"));
verify(mockNamespaces).getPersistence("myprop/clust/ns1");
+ namespaces.run(split("get-max-subscriptions-per-topic
myprop/clust/ns1"));
+ verify(mockNamespaces).getMaxSubscriptionsPerTopic("myprop/clust/ns1");
+ namespaces.run(split("set-max-subscriptions-per-topic myprop/clust/ns1
-m 300"));
+ verify(mockNamespaces).setMaxSubscriptionsPerTopic("myprop/clust/ns1",
300);
+ namespaces.run(split("remove-max-subscriptions-per-topic
myprop/clust/ns1"));
+
verify(mockNamespaces).removeMaxSubscriptionsPerTopic("myprop/clust/ns1");
+
namespaces.run(split("set-message-ttl myprop/clust/ns1 -ttl 300"));
verify(mockNamespaces).setNamespaceMessageTTL("myprop/clust/ns1", 300);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 7247f41..d2052cd 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -330,6 +330,46 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get max subscriptions per topic for a
namespace")
+ private class GetMaxSubscriptionsPerTopic extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getMaxSubscriptionsPerTopic(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Set max subscriptions per topic for a
namespace")
+ private class SetMaxSubscriptionsPerTopic extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--maxSubscriptionsPerTopic", "-m" }, description
= "Max subscriptions per topic",
+ required = true)
+ private int maxSubscriptionsPerTopic;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setMaxSubscriptionsPerTopic(namespace,
maxSubscriptionsPerTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Remove max subscriptions per topic for a
namespace")
+ private class RemoveMaxSubscriptionsPerTopic extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().removeMaxSubscriptionsPerTopic(namespace);
+ }
+ }
+
@Parameters(commandDescription = "Set subscription expiration time for a
namespace")
private class SetSubscriptionExpirationTime extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1815,6 +1855,10 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
+ jcommander.addCommand("get-max-subscriptions-per-topic", new
GetMaxSubscriptionsPerTopic());
+ jcommander.addCommand("set-max-subscriptions-per-topic", new
SetMaxSubscriptionsPerTopic());
+ jcommander.addCommand("remove-max-subscriptions-per-topic", new
RemoveMaxSubscriptionsPerTopic());
+
jcommander.addCommand("get-subscription-expiration-time", new
GetSubscriptionExpirationTime());
jcommander.addCommand("set-subscription-expiration-time", new
SetSubscriptionExpirationTime());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 969c0c0..a7ee4f7 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -86,6 +86,8 @@ public class Policies {
public int max_unacked_messages_per_consumer = -1;
@SuppressWarnings("checkstyle:MemberName")
public int max_unacked_messages_per_subscription = -1;
+ @SuppressWarnings("checkstyle:MemberName")
+ public Integer max_subscriptions_per_topic = null;
@SuppressWarnings("checkstyle:MemberName")
public long compaction_threshold = 0;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index b7f8f6a..c8fa3d7 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -34,6 +34,7 @@ public enum PolicyName {
MAX_PRODUCERS,
DEDUPLICATION_SNAPSHOT,
MAX_UNACKED,
+ MAX_SUBSCRIPTIONS,
OFFLOAD,
PERSISTENCE,
RATE,