This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 036e040f5232904cbb138eb5b40ffca91fdbd1ec
Author: feynmanlin <[email protected]>
AuthorDate: Wed Dec 16 01:40:31 2020 +0800

    Support configure max subscriptions per topic on the topic level policy 
(#8948)
    
    Master Issue: #8866
    
    Currently, #8289 introduced max subscriptions per topic at the broker level 
but does not support overwrite on the namespace level and topic level.
    
    add topic-level max subscriptions per topic API
    
    1.Verify that the basic API is correct
    2.Verify that the poilcy priority of the broker, namespace, and topic 
levels is correct.
    3.Verify that the policy of each level is correct
    
    (cherry picked from commit 911a4f9a6031703535b6f8a4fa671aaf51ac8230)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 249 +++++----------------
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  85 +++++++
 .../broker/service/persistent/PersistentTopic.java |  10 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 113 ++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  53 +++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  77 +++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   7 +
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  44 ++++
 .../pulsar/common/policies/data/TopicPolicies.java |   5 +
 9 files changed, 446 insertions(+), 197 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 61a8310..16a91f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2461,12 +2461,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (ttlInSecond != null && ttlInSecond < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for message TTL");
         }
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         TopicPolicies topicPolicies;
         //Update existing topic policy or create a new one if not exist.
         try {
@@ -2517,12 +2512,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetRetention(AsyncResponse asyncResponse){
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
                 .map(TopicPolicies::getRetentionPolicies);
         if (!retention.isPresent()) {
@@ -2533,12 +2523,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retention) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         if (retention == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -2563,12 +2548,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveRetention() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -2578,22 +2558,12 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<PersistencePolicies> internalGetPersistence(){
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
     }
 
     protected CompletableFuture<Void> 
internalSetPersistence(PersistencePolicies persistencePolicies) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         validatePersistencePolicies(persistencePolicies);
 
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
@@ -2602,12 +2572,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemovePersistence() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -2623,12 +2588,7 @@ public class PersistentTopicsBase extends AdminResource {
                     "and must be smaller than that in the broker-level");
         }
 
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
 
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
         topicPolicies.setMaxMessageSize(maxMessageSize);
@@ -2636,22 +2596,12 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<Integer> internalGetMaxMessageSize() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
     }
 
     protected Optional<Integer> internalGetMaxProducers() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxProducerPerTopic);
     }
 
@@ -2661,25 +2611,41 @@ public class PersistentTopicsBase extends AdminResource 
{
                     "maxProducers must be 0 or more");
         }
 
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
 
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
         topicPolicies.setMaxProducerPerTopic(maxProducers);
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
     }
 
-    protected CompletableFuture<Void> internalRemoveMaxProducers() {
+    protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() {
+        preValidation();
+        return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic);
+    }
+
+    protected CompletableFuture<Void> 
internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
+        if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "maxSubscriptionsPerTopic must be 0 or more");
+        }
+        preValidation();
+
+        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+        topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+    }
+
+    private void preValidation() {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         checkTopicLevelPolicyEnable();
+    }
+
+    protected CompletableFuture<Void> internalRemoveMaxProducers() {
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -2689,12 +2655,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected Optional<Integer> internalGetMaxConsumers() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumerPerTopic);
     }
 
@@ -2703,13 +2664,7 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxConsumers must be 0 or more");
         }
-
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
 
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
         topicPolicies.setMaxConsumerPerTopic(maxConsumers);
@@ -2717,12 +2672,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveMaxConsumers() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3419,22 +3369,12 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<DispatchRate> internalGetDispatchRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
     }
 
     protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate 
dispatchRate) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3445,12 +3385,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveDispatchRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3461,22 +3396,12 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
     }
 
     protected CompletableFuture<Void> 
internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3487,12 +3412,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() 
{
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3503,12 +3423,7 @@ public class PersistentTopicsBase extends AdminResource {
 
 
     protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
     }
 
@@ -3516,12 +3431,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (maxConsumersPerSubscription != null && maxConsumersPerSubscription 
< 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for maxConsumersPerSubscription");
         }
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
 
         TopicPolicies topicPolicies = getTopicPolicies(topicName)
                 .orElseGet(TopicPolicies::new);
@@ -3530,12 +3440,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> 
internalRemoveMaxConsumersPerSubscription() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3545,12 +3450,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected Optional<Long> internalGetCompactionThreshold() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
     }
 
@@ -3558,12 +3458,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (compactionThreshold != null && compactionThreshold < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for compactionThreshold");
         }
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
 
         TopicPolicies topicPolicies = getTopicPolicies(topicName)
             .orElseGet(TopicPolicies::new);
@@ -3572,12 +3467,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-          validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
           return CompletableFuture.completedFuture(null);
@@ -3587,23 +3477,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<PublishRate> internalGetPublishRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
 
     }
 
     protected CompletableFuture<Void> internalSetPublishRate(PublishRate 
publishRate) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         if (publishRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3614,12 +3494,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemovePublishRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3629,23 +3504,12 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<SubscribeRate> internalGetSubscribeRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-
-        checkTopicLevelPolicyEnable();
+        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
     }
 
     protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate 
subscribeRate) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         if (subscribeRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3656,12 +3520,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveSubscribeRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        checkTopicLevelPolicyEnable();
+        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 861fff4..ae3d734 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1760,6 +1760,91 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Get maxSubscriptionsPerTopic config for specified 
topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse 
asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<Integer> maxSubscriptionsPerTopic = 
internalGetMaxSubscriptionsPerTopic();
+            if (!maxSubscriptionsPerTopic.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(maxSubscriptionsPerTopic.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Set maxSubscriptionsPerTopic config for specified 
topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Invalid value of 
maxSubscriptionsPerTopic")})
+    public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse 
asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String 
encodedTopic,
+                                @ApiParam(value = "The max subscriptions of 
the topic") int maxSubscriptionsPerTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, 
ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Updating maxSubscriptionsPerTopic failed", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Updating maxSubscriptionsPerTopic failed", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully updated maxSubscriptionsPerTopic: 
namespace={}, topic={}"
+                                + ", maxSubscriptions={}"
+                        , clientAppId(), namespaceName, 
topicName.getLocalName(), maxSubscriptionsPerTopic);
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Remove maxSubscriptionsPerTopic config for 
specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse 
asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove maxSubscriptionsPerTopic", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove maximum subscription limit: 
namespace={}, topic={}",
+                        clientAppId(),
+                        namespaceName,
+                        topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/{topic}/maxProducers")
     @ApiOperation(value = "Get maxProducers config for specified topic.")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 20733e1..a27147e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2670,8 +2670,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     private boolean checkMaxSubscriptionsPerTopicExceed() {
-        Integer maxSubsPerTopic = maxSubscriptionsPerTopic;
-
+        TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+        Integer maxSubsPerTopic = null;
+        if (topicPolicies != null && 
topicPolicies.isMaxSubscriptionsPerTopicSet()) {
+            maxSubsPerTopic = topicPolicies.getMaxSubscriptionsPerTopic();
+        }
+        if (maxSubsPerTopic == null) {
+            maxSubsPerTopic = maxSubscriptionsPerTopic;
+        }
         if (maxSubsPerTopic == null) {
             maxSubsPerTopic = 
brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 63d382b..5c3d9ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -51,6 +52,8 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -1311,4 +1314,114 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         //should not fail
         assertNull(admin.topics().getMessageTTL(topic));
     }
+
+    @Test(timeOut = 20000)
+    public void testMaxSubscriptionsPerTopicApi() throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
+        // init cache
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+
+        assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic));
+        // set max subscriptions
+        admin.topics().setMaxSubscriptionsPerTopic(topic, 10);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> 
pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != 
null);
+        
assertEquals(admin.topics().getMaxSubscriptionsPerTopic(topic).intValue(), 10);
+        // remove max subscriptions
+        admin.topics().removeMaxSubscriptionsPerTopic(topic);
+        assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic));
+        // set invalidate value
+        try {
+            admin.topics().setMaxMessageSize(topic, -1);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testMaxSubscriptionsPerTopic() throws Exception {
+        int brokerLevelMaxSub = 4;
+        conf.setMaxSubscriptionsPerTopic(4);
+        restartBroker();
+        final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
+        // init cache
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        // Set topic-level max subscriptions
+        final int topicLevelMaxSubNum = 2;
+        admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> 
pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != 
null);
+
+        List<Consumer<String>> consumerList = new ArrayList<>();
+        for (int i = 0; i < topicLevelMaxSubNum; i++) {
+            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .topic(topic).subscribe();
+            consumerList.add(consumer);
+        }
+        try (PulsarClient client = PulsarClient.builder().operationTimeout(2, 
TimeUnit.SECONDS)
+                .serviceUrl(brokerUrl.toString()).build()) {
+            consumerList.add(client.newConsumer(Schema.STRING)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .topic(topic).subscribe());
+            fail("should fail");
+        } catch (PulsarClientException ignore) {
+            assertEquals(consumerList.size(), topicLevelMaxSubNum);
+        }
+        // Set namespace-level policy, but will not take effect
+        final int namespaceLevelMaxSub = 3;
+        admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, 
namespaceLevelMaxSub);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+        Field field = 
PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
+        field.setAccessible(true);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) 
field.get(persistentTopic) == namespaceLevelMaxSub);
+
+        try (PulsarClient client = 
PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
+                .serviceUrl(brokerUrl.toString()).build()) {
+            client.newConsumer(Schema.STRING)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .topic(topic).subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignore) {
+            assertEquals(consumerList.size(), topicLevelMaxSubNum);
+        }
+        //Removed topic-level policy, namespace-level should take effect
+        admin.topics().removeMaxSubscriptionsPerTopic(topic);
+        consumerList.add(pulsarClient.newConsumer(Schema.STRING)
+                
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe());
+        assertEquals(consumerList.size(), namespaceLevelMaxSub);
+        try (PulsarClient client = 
PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
+                .serviceUrl(brokerUrl.toString()).build()) {
+            consumerList.add(client.newConsumer(Schema.STRING)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .topic(topic).subscribe());
+            fail("should fail");
+        } catch (PulsarClientException ignore) {
+            assertEquals(consumerList.size(), namespaceLevelMaxSub);
+        }
+        //Removed namespace-level policy, broker-level should take effect
+        admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
field.get(persistentTopic) == null);
+        consumerList.add(pulsarClient.newConsumer(Schema.STRING)
+                
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe());
+        assertEquals(consumerList.size(), brokerLevelMaxSub);
+        try (PulsarClient client = 
PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
+                .serviceUrl(brokerUrl.toString()).build()) {
+            consumerList.add(client.newConsumer(Schema.STRING)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .topic(topic).subscribe());
+            fail("should fail");
+        } catch (PulsarClientException ignore) {
+            assertEquals(consumerList.size(), brokerLevelMaxSub);
+        }
+        //Clean up
+        for (Consumer<String> c : consumerList) {
+            c.close();
+        }
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 69d4600..d5c35a9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2473,6 +2473,59 @@ public interface Topics {
      * @param topic Topic name
      */
     CompletableFuture<Void> removeMaxProducersAsync(String topic);
+
+    /**
+     * Get the max number of subscriptions for specified topic.
+     *
+     * @param topic Topic name
+     * @return Configuration of bookkeeper persistence policies
+     * @throws PulsarAdminException Unexpected error
+     */
+    Integer getMaxSubscriptionsPerTopic(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Get the max number of subscriptions for specified topic asynchronously.
+     *
+     * @param topic Topic name
+     * @return Configuration of bookkeeper persistence policies
+     * @throws PulsarAdminException Unexpected error
+     */
+    CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String topic);
+
+
+    /**
+     * Set the max number of subscriptions for specified topic.
+     *
+     * @param topic Topic name
+     * @param maxSubscriptionsPerTopic Max number of subscriptions
+     * @throws PulsarAdminException Unexpected error
+     */
+    void setMaxSubscriptionsPerTopic(String topic, int 
maxSubscriptionsPerTopic) throws PulsarAdminException;
+
+    /**
+     * Set the max number of subscriptions for specified topic asynchronously.
+     *
+     * @param topic Topic name
+     * @param maxSubscriptionsPerTopic Max number of subscriptions
+     * @throws PulsarAdminException Unexpected error
+     */
+    CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String topic, int 
maxSubscriptionsPerTopic);
+
+    /**
+     * Remove the max number of subscriptions for specified topic.
+     *
+     * @param topic Topic name
+     * @throws PulsarAdminException Unexpected error
+     */
+    void removeMaxSubscriptionsPerTopic(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Remove the max number of subscriptions for specified topic 
asynchronously.
+     *
+     * @param topic Topic name
+     */
+    CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String topic);
+
     /**
      * Get the max message size for specified topic.
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 848eabb..314a50b 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2645,6 +2645,83 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public Integer getMaxSubscriptionsPerTopic(String topic) throws 
PulsarAdminException {
+        try {
+            return 
getMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String 
topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer maxSubscriptionsPerTopic) {
+                        future.complete(maxSubscriptionsPerTopic);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setMaxSubscriptionsPerTopic(String topic, int 
maxSubscriptionsPerTopic) throws PulsarAdminException {
+        try {
+            setMaxSubscriptionsPerTopicAsync(topic, maxSubscriptionsPerTopic)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String 
topic, int maxSubscriptionsPerTopic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
+        return asyncPostRequest(path, Entity.entity(maxSubscriptionsPerTopic, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeMaxSubscriptionsPerTopic(String topic) throws 
PulsarAdminException {
+        try {
+            removeMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String 
topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Integer getMaxMessageSize(String topic) throws PulsarAdminException 
{
         try {
             return getMaxMessageSizeAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 5e77900..ec13cfb 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -843,6 +843,13 @@ public class PulsarAdminToolTest {
         
verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
                 , new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true));
 
+        cmdTopics.run(split("get-max-subscriptions-per-topic 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-subscriptions-per-topic 
persistent://myprop/clust/ns1/ds1 -m 100"));
+        
verify(mockTopics).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1",
 100);
+        cmdTopics.run(split("remove-max-subscriptions-per-topic 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+
         // argument matcher for the timestamp in reset cursor. Since we can't 
verify exact timestamp, we check for a
         // range of +/- 1 second of the expected timestamp
         class TimestampMatcher implements ArgumentMatcher<Long> {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 169a4cd..fadadf0 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -184,6 +184,10 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("set-max-producers", new SetMaxProducers());
         jcommander.addCommand("remove-max-producers", new 
RemoveMaxProducers());
 
+        jcommander.addCommand("get-max-subscriptions-per-topic", new 
GetMaxSubscriptionsPerTopic());
+        jcommander.addCommand("set-max-subscriptions-per-topic", new 
SetMaxSubscriptionsPerTopic());
+        jcommander.addCommand("remove-max-subscriptions-per-topic", new 
RemoveMaxSubscriptionsPerTopic());
+
         jcommander.addCommand("get-max-message-size", new GetMaxMessageSize());
         jcommander.addCommand("set-max-message-size", new SetMaxMessageSize());
         jcommander.addCommand("remove-max-message-size", new 
RemoveMaxMessageSize());
@@ -1796,6 +1800,46 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get max number of subscriptions for a 
topic")
+    private class GetMaxSubscriptionsPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getMaxSubscriptionsPerTopic(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set max number of subscriptions for a 
topic")
+    private class SetMaxSubscriptionsPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--max-subscriptions-per-topic", "-m"},
+                description = "Maximum subscription limit for a topic", 
required = true)
+        private int maxSubscriptionsPerTopic;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setMaxSubscriptionsPerTopic(persistentTopic, 
maxSubscriptionsPerTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max number of subscriptions for a 
topic")
+    private class RemoveMaxSubscriptionsPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeMaxSubscriptionsPerTopic(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get max message size for a topic")
     private class GetMaxMessageSize extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 69d9fa3..1bfeabe 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -58,6 +58,11 @@ public class TopicPolicies {
     private SubscribeRate subscribeRate = null;
     private Integer deduplicationSnapshotIntervalSeconds = null;
     private Integer maxMessageSize = null;
+    private Integer maxSubscriptionsPerTopic = null;
+
+    public boolean isMaxSubscriptionsPerTopicSet() {
+        return maxSubscriptionsPerTopic != null;
+    }
 
     public boolean isMaxMessageSizeSet() {
         return maxMessageSize != null;

Reply via email to