This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 036e040f5232904cbb138eb5b40ffca91fdbd1ec Author: feynmanlin <[email protected]> AuthorDate: Wed Dec 16 01:40:31 2020 +0800 Support configure max subscriptions per topic on the topic level policy (#8948) Master Issue: #8866 Currently, #8289 introduced max subscriptions per topic at the broker level but does not support overwrite on the namespace level and topic level. add topic-level max subscriptions per topic API 1.Verify that the basic API is correct 2.Verify that the poilcy priority of the broker, namespace, and topic levels is correct. 3.Verify that the policy of each level is correct (cherry picked from commit 911a4f9a6031703535b6f8a4fa671aaf51ac8230) --- .../broker/admin/impl/PersistentTopicsBase.java | 249 +++++---------------- .../pulsar/broker/admin/v2/PersistentTopics.java | 85 +++++++ .../broker/service/persistent/PersistentTopic.java | 10 +- .../pulsar/broker/admin/TopicPoliciesTest.java | 113 ++++++++++ .../org/apache/pulsar/client/admin/Topics.java | 53 +++++ .../pulsar/client/admin/internal/TopicsImpl.java | 77 +++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 7 + .../org/apache/pulsar/admin/cli/CmdTopics.java | 44 ++++ .../pulsar/common/policies/data/TopicPolicies.java | 5 + 9 files changed, 446 insertions(+), 197 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 61a8310..16a91f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2461,12 +2461,7 @@ public class PersistentTopicsBase extends AdminResource { if (ttlInSecond != null && ttlInSecond < 0) { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); } - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); TopicPolicies topicPolicies; //Update existing topic policy or create a new one if not exist. try { @@ -2517,12 +2512,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalGetRetention(AsyncResponse asyncResponse){ - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<RetentionPolicies> retention = getTopicPolicies(topicName) .map(TopicPolicies::getRetentionPolicies); if (!retention.isPresent()) { @@ -2533,12 +2523,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); if (retention == null) { return CompletableFuture.completedFuture(null); } @@ -2563,12 +2548,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveRetention() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -2578,22 +2558,12 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<PersistencePolicies> internalGetPersistence(){ - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getPersistence); } protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); validatePersistencePolicies(persistencePolicies); TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); @@ -2602,12 +2572,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemovePersistence() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -2623,12 +2588,7 @@ public class PersistentTopicsBase extends AdminResource { "and must be smaller than that in the broker-level"); } - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); topicPolicies.setMaxMessageSize(maxMessageSize); @@ -2636,22 +2596,12 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<Integer> internalGetMaxMessageSize() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize); } protected Optional<Integer> internalGetMaxProducers() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getMaxProducerPerTopic); } @@ -2661,25 +2611,41 @@ public class PersistentTopicsBase extends AdminResource { "maxProducers must be 0 or more"); } - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); topicPolicies.setMaxProducerPerTopic(maxProducers); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); } - protected CompletableFuture<Void> internalRemoveMaxProducers() { + protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() { + preValidation(); + return getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic); + } + + protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) { + if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { + throw new RestException(Status.PRECONDITION_FAILED, + "maxSubscriptionsPerTopic must be 0 or more"); + } + preValidation(); + + TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); + topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + + private void preValidation() { validateAdminAccessForTenant(namespaceName.getTenant()); validatePoliciesReadOnlyAccess(); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } checkTopicLevelPolicyEnable(); + } + + protected CompletableFuture<Void> internalRemoveMaxProducers() { + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -2689,12 +2655,7 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<Integer> internalGetMaxConsumers() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumerPerTopic); } @@ -2703,13 +2664,7 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(Status.PRECONDITION_FAILED, "maxConsumers must be 0 or more"); } - - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); topicPolicies.setMaxConsumerPerTopic(maxConsumers); @@ -2717,12 +2672,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveMaxConsumers() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -3419,22 +3369,12 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<DispatchRate> internalGetDispatchRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate); } protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate dispatchRate) { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); if (dispatchRate == null) { return CompletableFuture.completedFuture(null); } @@ -3445,12 +3385,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveDispatchRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -3461,22 +3396,12 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate); } protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); if (dispatchRate == null) { return CompletableFuture.completedFuture(null); } @@ -3487,12 +3412,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -3503,12 +3423,7 @@ public class PersistentTopicsBase extends AdminResource { protected Optional<Integer> internalGetMaxConsumersPerSubscription() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription); } @@ -3516,12 +3431,7 @@ public class PersistentTopicsBase extends AdminResource { if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription"); } - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); TopicPolicies topicPolicies = getTopicPolicies(topicName) .orElseGet(TopicPolicies::new); @@ -3530,12 +3440,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -3545,12 +3450,7 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<Long> internalGetCompactionThreshold() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold); } @@ -3558,12 +3458,7 @@ public class PersistentTopicsBase extends AdminResource { if (compactionThreshold != null && compactionThreshold < 0) { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold"); } - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); TopicPolicies topicPolicies = getTopicPolicies(topicName) .orElseGet(TopicPolicies::new); @@ -3572,12 +3467,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveCompactionThreshold() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -3587,23 +3477,13 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<PublishRate> internalGetPublishRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate); } protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate) { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); if (publishRate == null) { return CompletableFuture.completedFuture(null); } @@ -3614,12 +3494,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemovePublishRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); @@ -3629,23 +3504,12 @@ public class PersistentTopicsBase extends AdminResource { } protected Optional<SubscribeRate> internalGetSubscribeRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - - checkTopicLevelPolicyEnable(); + preValidation(); return getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate); } protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); if (subscribeRate == null) { return CompletableFuture.completedFuture(null); } @@ -3656,12 +3520,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveSubscribeRate() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); + preValidation(); Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 861fff4..ae3d734 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1760,6 +1760,91 @@ public class PersistentTopics extends PersistentTopicsBase { } @GET + @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic") + @ApiOperation(value = "Get maxSubscriptionsPerTopic config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + try { + Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic(); + if (!maxSubscriptionsPerTopic.isPresent()) { + asyncResponse.resume(Response.noContent().build()); + } else { + asyncResponse.resume(maxSubscriptionsPerTopic.get()); + } + } catch (RestException e) { + asyncResponse.resume(e); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic") + @ApiOperation(value = "Set maxSubscriptionsPerTopic config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "Invalid value of maxSubscriptionsPerTopic")}) + public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, ex) -> { + if (ex instanceof RestException) { + log.error("Updating maxSubscriptionsPerTopic failed", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Updating maxSubscriptionsPerTopic failed", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}" + + ", maxSubscriptions={}" + , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic") + @ApiOperation(value = "Remove maxSubscriptionsPerTopic config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> { + if (ex != null) { + log.error("Failed to remove maxSubscriptionsPerTopic", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully remove maximum subscription limit: namespace={}, topic={}", + clientAppId(), + namespaceName, + topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @GET @Path("/{tenant}/{namespace}/{topic}/maxProducers") @ApiOperation(value = "Get maxProducers config for specified topic.") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 20733e1..a27147e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2670,8 +2670,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } private boolean checkMaxSubscriptionsPerTopicExceed() { - Integer maxSubsPerTopic = maxSubscriptionsPerTopic; - + TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic)); + Integer maxSubsPerTopic = null; + if (topicPolicies != null && topicPolicies.isMaxSubscriptionsPerTopicSet()) { + maxSubsPerTopic = topicPolicies.getMaxSubscriptionsPerTopic(); + } + if (maxSubsPerTopic == null) { + maxSubsPerTopic = maxSubscriptionsPerTopic; + } if (maxSubsPerTopic == null) { maxSubsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 63d382b..5c3d9ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; @@ -51,6 +52,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -1311,4 +1314,114 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { //should not fail assertNull(admin.topics().getMessageTTL(topic)); } + + @Test(timeOut = 20000) + public void testMaxSubscriptionsPerTopicApi() throws Exception { + final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); + // init cache + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + + assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic)); + // set max subscriptions + admin.topics().setMaxSubscriptionsPerTopic(topic, 10); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() + -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + assertEquals(admin.topics().getMaxSubscriptionsPerTopic(topic).intValue(), 10); + // remove max subscriptions + admin.topics().removeMaxSubscriptionsPerTopic(topic); + assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic)); + // set invalidate value + try { + admin.topics().setMaxMessageSize(topic, -1); + fail("should fail"); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(), 412); + } + } + + @Test(timeOut = 20000) + public void testMaxSubscriptionsPerTopic() throws Exception { + int brokerLevelMaxSub = 4; + conf.setMaxSubscriptionsPerTopic(4); + restartBroker(); + final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); + // init cache + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + // Set topic-level max subscriptions + final int topicLevelMaxSubNum = 2; + admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() + -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + + List<Consumer<String>> consumerList = new ArrayList<>(); + for (int i = 0; i < topicLevelMaxSubNum; i++) { + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()) + .topic(topic).subscribe(); + consumerList.add(consumer); + } + try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS) + .serviceUrl(brokerUrl.toString()).build()) { + consumerList.add(client.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()) + .topic(topic).subscribe()); + fail("should fail"); + } catch (PulsarClientException ignore) { + assertEquals(consumerList.size(), topicLevelMaxSubNum); + } + // Set namespace-level policy, but will not take effect + final int namespaceLevelMaxSub = 3; + admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, namespaceLevelMaxSub); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); + field.setAccessible(true); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == namespaceLevelMaxSub); + + try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS) + .serviceUrl(brokerUrl.toString()).build()) { + client.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()) + .topic(topic).subscribe(); + fail("should fail"); + } catch (PulsarClientException ignore) { + assertEquals(consumerList.size(), topicLevelMaxSubNum); + } + //Removed topic-level policy, namespace-level should take effect + admin.topics().removeMaxSubscriptionsPerTopic(topic); + consumerList.add(pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe()); + assertEquals(consumerList.size(), namespaceLevelMaxSub); + try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS) + .serviceUrl(brokerUrl.toString()).build()) { + consumerList.add(client.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()) + .topic(topic).subscribe()); + fail("should fail"); + } catch (PulsarClientException ignore) { + assertEquals(consumerList.size(), namespaceLevelMaxSub); + } + //Removed namespace-level policy, broker-level should take effect + admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null); + consumerList.add(pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe()); + assertEquals(consumerList.size(), brokerLevelMaxSub); + try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS) + .serviceUrl(brokerUrl.toString()).build()) { + consumerList.add(client.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()) + .topic(topic).subscribe()); + fail("should fail"); + } catch (PulsarClientException ignore) { + assertEquals(consumerList.size(), brokerLevelMaxSub); + } + //Clean up + for (Consumer<String> c : consumerList) { + c.close(); + } + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 69d4600..d5c35a9 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2473,6 +2473,59 @@ public interface Topics { * @param topic Topic name */ CompletableFuture<Void> removeMaxProducersAsync(String topic); + + /** + * Get the max number of subscriptions for specified topic. + * + * @param topic Topic name + * @return Configuration of bookkeeper persistence policies + * @throws PulsarAdminException Unexpected error + */ + Integer getMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException; + + /** + * Get the max number of subscriptions for specified topic asynchronously. + * + * @param topic Topic name + * @return Configuration of bookkeeper persistence policies + * @throws PulsarAdminException Unexpected error + */ + CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String topic); + + + /** + * Set the max number of subscriptions for specified topic. + * + * @param topic Topic name + * @param maxSubscriptionsPerTopic Max number of subscriptions + * @throws PulsarAdminException Unexpected error + */ + void setMaxSubscriptionsPerTopic(String topic, int maxSubscriptionsPerTopic) throws PulsarAdminException; + + /** + * Set the max number of subscriptions for specified topic asynchronously. + * + * @param topic Topic name + * @param maxSubscriptionsPerTopic Max number of subscriptions + * @throws PulsarAdminException Unexpected error + */ + CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String topic, int maxSubscriptionsPerTopic); + + /** + * Remove the max number of subscriptions for specified topic. + * + * @param topic Topic name + * @throws PulsarAdminException Unexpected error + */ + void removeMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException; + + /** + * Remove the max number of subscriptions for specified topic asynchronously. + * + * @param topic Topic name + */ + CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String topic); + /** * Get the max message size for specified topic. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 848eabb..314a50b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2645,6 +2645,83 @@ public class TopicsImpl extends BaseResource implements Topics { } @Override + public Integer getMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException { + try { + return getMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic"); + final CompletableFuture<Integer> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Integer>() { + @Override + public void completed(Integer maxSubscriptionsPerTopic) { + future.complete(maxSubscriptionsPerTopic); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setMaxSubscriptionsPerTopic(String topic, int maxSubscriptionsPerTopic) throws PulsarAdminException { + try { + setMaxSubscriptionsPerTopicAsync(topic, maxSubscriptionsPerTopic) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String topic, int maxSubscriptionsPerTopic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic"); + return asyncPostRequest(path, Entity.entity(maxSubscriptionsPerTopic, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException { + try { + removeMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic"); + return asyncDeleteRequest(path); + } + + @Override public Integer getMaxMessageSize(String topic) throws PulsarAdminException { try { return getMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 5e77900..ec13cfb 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -843,6 +843,13 @@ public class PulsarAdminToolTest { verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1" , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); + cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -m 100")); + verify(mockTopics).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 100); + cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1"); + // argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a // range of +/- 1 second of the expected timestamp class TimestampMatcher implements ArgumentMatcher<Long> { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 169a4cd..fadadf0 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -184,6 +184,10 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("set-max-producers", new SetMaxProducers()); jcommander.addCommand("remove-max-producers", new RemoveMaxProducers()); + jcommander.addCommand("get-max-subscriptions-per-topic", new GetMaxSubscriptionsPerTopic()); + jcommander.addCommand("set-max-subscriptions-per-topic", new SetMaxSubscriptionsPerTopic()); + jcommander.addCommand("remove-max-subscriptions-per-topic", new RemoveMaxSubscriptionsPerTopic()); + jcommander.addCommand("get-max-message-size", new GetMaxMessageSize()); jcommander.addCommand("set-max-message-size", new SetMaxMessageSize()); jcommander.addCommand("remove-max-message-size", new RemoveMaxMessageSize()); @@ -1796,6 +1800,46 @@ public class CmdTopics extends CmdBase { } } + @Parameters(commandDescription = "Get max number of subscriptions for a topic") + private class GetMaxSubscriptionsPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getMaxSubscriptionsPerTopic(persistentTopic)); + } + } + + @Parameters(commandDescription = "Set max number of subscriptions for a topic") + private class SetMaxSubscriptionsPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = {"--max-subscriptions-per-topic", "-m"}, + description = "Maximum subscription limit for a topic", required = true) + private int maxSubscriptionsPerTopic; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setMaxSubscriptionsPerTopic(persistentTopic, maxSubscriptionsPerTopic); + } + } + + @Parameters(commandDescription = "Remove max number of subscriptions for a topic") + private class RemoveMaxSubscriptionsPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeMaxSubscriptionsPerTopic(persistentTopic); + } + } + @Parameters(commandDescription = "Get max message size for a topic") private class GetMaxMessageSize extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 69d9fa3..1bfeabe 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -58,6 +58,11 @@ public class TopicPolicies { private SubscribeRate subscribeRate = null; private Integer deduplicationSnapshotIntervalSeconds = null; private Integer maxMessageSize = null; + private Integer maxSubscriptionsPerTopic = null; + + public boolean isMaxSubscriptionsPerTopicSet() { + return maxSubscriptionsPerTopic != null; + } public boolean isMaxMessageSizeSet() { return maxMessageSize != null;
