This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 6538d32  Fix topic ownership is not checked when get topic policy 
(target branch-2.7) (#9781)
6538d32 is described below

commit 6538d32e6a9728957ae07ff2863d92d962b577a8
Author: Yong Zhang <[email protected]>
AuthorDate: Fri Mar 5 10:18:34 2021 +0800

    Fix topic ownership is not checked when get topic policy (target 
branch-2.7) (#9781)
    
    ### Motivation
    1. Currently, the API of topic policies does not check the ownership of 
topic. In the case of multiple brokers, a `cache not init error` will appear.
    
    2. The parameter validation of the read and write API should not be the 
same, and the read API should not verify `validatePoliciesReadOnlyAccess()`. If 
`ReadOnly` is set, the read API will also be abnormal.
    
    3. General API should not use `validateAdminAccessForTenant()`.
    
    The original PR is #9767
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 51 +-----------
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 97 +++++++++++++---------
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 69 ++++++++++++++-
 .../broker/auth/MockedPulsarServiceBaseTest.java   | 14 +++-
 4 files changed, 139 insertions(+), 92 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b2c2324..6b5fc60 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2401,15 +2401,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalSetBacklogQuota(AsyncResponse asyncResponse, 
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
         if (backlogQuotaType == null) {
             backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage;
         }
-        checkTopicLevelPolicyEnable();
         TopicPolicies topicPolicies;
         try {
             topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
@@ -2476,7 +2470,6 @@ public class PersistentTopicsBase extends AdminResource {
         if (ttlInSecond != null && ttlInSecond < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for message TTL");
         }
-        preValidation();
         TopicPolicies topicPolicies;
         //Update existing topic policy or create a new one if not exist.
         try {
@@ -2527,7 +2520,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetRetention(AsyncResponse asyncResponse){
-        preValidation();
         Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
                 .map(TopicPolicies::getRetentionPolicies);
         if (!retention.isPresent()) {
@@ -2538,7 +2530,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retention) {
-        preValidation();
         if (retention == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -2563,7 +2554,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveRetention() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -2573,12 +2563,10 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<PersistencePolicies> internalGetPersistence(){
-        preValidation();
         return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
     }
 
     protected CompletableFuture<Void> 
internalSetPersistence(PersistencePolicies persistencePolicies) {
-        preValidation();
         validatePersistencePolicies(persistencePolicies);
 
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
@@ -2587,7 +2575,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemovePersistence() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -2603,20 +2590,16 @@ public class PersistentTopicsBase extends AdminResource 
{
                     "and must be smaller than that in the broker-level");
         }
 
-        preValidation();
-
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
         topicPolicies.setMaxMessageSize(maxMessageSize);
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
     }
 
     protected Optional<Integer> internalGetMaxMessageSize() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
     }
 
     protected Optional<Integer> internalGetMaxProducers() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxProducerPerTopic);
     }
 
@@ -2625,16 +2608,12 @@ public class PersistentTopicsBase extends AdminResource 
{
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxProducers must be 0 or more");
         }
-
-        preValidation();
-
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
         topicPolicies.setMaxProducerPerTopic(maxProducers);
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
     }
 
     protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic);
     }
 
@@ -2643,24 +2622,21 @@ public class PersistentTopicsBase extends AdminResource 
{
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxSubscriptionsPerTopic must be 0 or more");
         }
-        preValidation();
 
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
         topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
     }
 
-    private void preValidation() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
+    protected void preValidation() {
+        checkTopicLevelPolicyEnable();
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        checkTopicLevelPolicyEnable();
+        validateTopicOwnership(topicName, false);
     }
 
     protected CompletableFuture<Void> internalRemoveMaxProducers() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -2670,7 +2646,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected Optional<Integer> internalGetMaxConsumers() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumerPerTopic);
     }
 
@@ -2679,7 +2654,6 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxConsumers must be 0 or more");
         }
-        preValidation();
 
         TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
         topicPolicies.setMaxConsumerPerTopic(maxConsumers);
@@ -2687,7 +2661,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveMaxConsumers() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3492,12 +3465,10 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<DispatchRate> internalGetDispatchRate() {
-        preValidation();
         return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
     }
 
     protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate 
dispatchRate) {
-        preValidation();
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3508,7 +3479,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveDispatchRate() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3519,12 +3489,10 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
     }
 
     protected CompletableFuture<Void> 
internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
-        preValidation();
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3535,7 +3503,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() 
{
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3546,7 +3513,6 @@ public class PersistentTopicsBase extends AdminResource {
 
 
     protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
     }
 
@@ -3554,7 +3520,6 @@ public class PersistentTopicsBase extends AdminResource {
         if (maxConsumersPerSubscription != null && maxConsumersPerSubscription 
< 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for maxConsumersPerSubscription");
         }
-        preValidation();
 
         TopicPolicies topicPolicies = getTopicPolicies(topicName)
                 .orElseGet(TopicPolicies::new);
@@ -3563,7 +3528,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> 
internalRemoveMaxConsumersPerSubscription() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3573,7 +3537,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected Optional<Long> internalGetCompactionThreshold() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
     }
 
@@ -3581,7 +3544,6 @@ public class PersistentTopicsBase extends AdminResource {
         if (compactionThreshold != null && compactionThreshold < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for compactionThreshold");
         }
-        preValidation();
 
         TopicPolicies topicPolicies = getTopicPolicies(topicName)
             .orElseGet(TopicPolicies::new);
@@ -3590,7 +3552,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
           return CompletableFuture.completedFuture(null);
@@ -3600,13 +3561,11 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<PublishRate> internalGetPublishRate() {
-        preValidation();
         return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
 
     }
 
     protected CompletableFuture<Void> internalSetPublishRate(PublishRate 
publishRate) {
-        preValidation();
         if (publishRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3617,7 +3576,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemovePublishRate() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
@@ -3627,12 +3585,10 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Optional<SubscribeRate> internalGetSubscribeRate() {
-        preValidation();
         return 
getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
     }
 
     protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate 
subscribeRate) {
-        preValidation();
         if (subscribeRate == null) {
             return CompletableFuture.completedFuture(null);
         }
@@ -3643,7 +3599,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveSubscribeRate() {
-        preValidation();
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (!topicPolicies.isPresent()) {
             return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4474fab..45b1b30 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -265,6 +265,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                                     @PathParam("namespace") 
String namespace,
                                                     @PathParam("topic") 
@Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
         if (topicPolicies.isOffloadPoliciesSet()) {
             asyncResponse.resume(topicPolicies.getOffloadPolicies());
@@ -285,12 +286,7 @@ public class PersistentTopics extends PersistentTopicsBase 
{
                                                     @ApiParam(value = "Offload 
policies for the specified topic")
                                                             OffloadPolicies 
offloadPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        validatePoliciesReadOnlyAccess();
-        checkTopicLevelPolicyEnable();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
+        preValidation();
         internalSetOffloadPolicies(offloadPolicies).whenComplete((res, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed set offloadPolicies", ex);
@@ -314,6 +310,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                       @PathParam("namespace") String namespace,
                                       @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         setOffloadPolicies(asyncResponse, tenant, namespace, encodedTopic, 
null);
     }
 
@@ -328,6 +325,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                                     @PathParam("namespace") 
String namespace,
                                                     @PathParam("topic") 
@Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
         if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
             
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
@@ -348,12 +346,7 @@ public class PersistentTopics extends PersistentTopicsBase 
{
                                                     @ApiParam(value = "Max 
unacked messages on consumer policies for the specified topic")
                                                             Integer 
maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        validatePoliciesReadOnlyAccess();
-        checkTopicLevelPolicyEnable();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
+        preValidation();
         
internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) 
-> {
             if (ex instanceof RestException) {
                 log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
@@ -378,6 +371,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                                     @PathParam("namespace") 
String namespace,
                                                     @PathParam("topic") 
@Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
         if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
             
asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
@@ -398,9 +392,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                                     @ApiParam(value = 
"Interval to take deduplication snapshot for the specified topic")
                                                             Integer interval) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        validatePoliciesReadOnlyAccess();
-        checkTopicLevelPolicyEnable();
+        preValidation();
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
@@ -427,12 +419,7 @@ public class PersistentTopics extends PersistentTopicsBase 
{
                                                    @PathParam("namespace") 
String namespace,
                                                    @PathParam("topic") 
@Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        validatePoliciesReadOnlyAccess();
-        checkTopicLevelPolicyEnable();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
+        preValidation();
         internalSetDeduplicationSnapshotInterval(null).whenComplete((res, ex) 
-> {
             if (ex instanceof RestException) {
                 log.error("Failed delete deduplicationSnapshotInterval", ex);
@@ -470,6 +457,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                          @PathParam("namespace") String 
namespace,
                                          @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
         if (topicPolicies.isInactiveTopicPoliciesSet()) {
             asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
@@ -490,12 +478,7 @@ public class PersistentTopics extends PersistentTopicsBase 
{
                                                 @ApiParam(value = "inactive 
topic policies for the specified topic")
                                                         InactiveTopicPolicies 
inactiveTopicPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        validatePoliciesReadOnlyAccess();
-        checkTopicLevelPolicyEnable();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
+        preValidation();
         
internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex) 
-> {
             if (ex instanceof RestException) {
                 log.error("Failed set InactiveTopicPolicies", ex);
@@ -533,6 +516,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                                     @PathParam("namespace") 
String namespace,
                                                     @PathParam("topic") 
@Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
         if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
             
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
@@ -553,12 +537,7 @@ public class PersistentTopics extends PersistentTopicsBase 
{
                                                     @ApiParam(value = "Max 
unacked messages on subscription policies for the specified topic")
                                                             Integer 
maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        validatePoliciesReadOnlyAccess();
-        checkTopicLevelPolicyEnable();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
+        preValidation();
         
internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res, 
ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
@@ -598,6 +577,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                                            @PathParam("namespace") String 
namespace,
                                            @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
         if (topicPolicies.isDelayedDeliveryEnabledSet() && 
topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
             asyncResponse.resume(new 
DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
@@ -618,12 +598,7 @@ public class PersistentTopics extends PersistentTopicsBase 
{
                                            @PathParam("topic") @Encoded String 
encodedTopic,
                                            @ApiParam(value = "Delayed delivery 
policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        validatePoliciesReadOnlyAccess();
-        checkTopicLevelPolicyEnable();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
+        preValidation();
         internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
     }
 
@@ -1483,6 +1458,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         return getTopicPolicies(topicName)
                 .map(TopicPolicies::getBackLogQuotaMap)
                 .map(map -> {
@@ -1507,6 +1483,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType 
backlogQuotaType, BacklogQuota backlogQuota) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
     }
 
@@ -1521,6 +1498,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType 
backlogQuotaType) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
     }
 
@@ -1534,6 +1512,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                              @PathParam("namespace") String namespace,
                              @PathParam("topic") @Encoded String encodedTopic) 
{
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         return getTopicPolicies(topicName)
                 .map(TopicPolicies::getMessageTTLInSeconds)
                 .orElse(0);  //same as default ttl at namespace level
@@ -1553,6 +1532,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                               @ApiParam(value = "TTL in seconds for the 
specified namespace", required = true)
                               @QueryParam("messageTTL") int messageTTL) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetMessageTTL(asyncResponse, messageTTL);
     }
 
@@ -1568,6 +1548,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                               @PathParam("namespace") String namespace,
                               @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetMessageTTL(asyncResponse, null);
     }
 
@@ -1582,6 +1563,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                              @PathParam("namespace") String namespace,
                              @PathParam("topic") @Encoded String encodedTopic) 
{
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
         if (topicPolicies.isDeduplicationSet()) {
             asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
@@ -1602,6 +1584,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                              @PathParam("topic") @Encoded String encodedTopic,
                              @ApiParam(value = "DeduplicationEnabled policies 
for the specified topic") Boolean enabled) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed updated deduplication", ex);
@@ -1642,6 +1625,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             internalGetRetention(asyncResponse);
         } catch (RestException e) {
@@ -1665,6 +1649,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Retention policies for the specified 
namespace") RetentionPolicies retention) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetRetention(retention).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed updated retention", ex);
@@ -1698,6 +1683,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                              @PathParam("namespace") String namespace,
                              @PathParam("topic") @Encoded String encodedTopic) 
{
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveRetention().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed updated retention", ex);
@@ -1724,6 +1710,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                @PathParam("namespace") String namespace,
                                @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<PersistencePolicies> persistencePolicies = 
internalGetPersistence();
             if (!persistencePolicies.isPresent()) {
@@ -1752,6 +1739,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                @PathParam("topic") @Encoded String 
encodedTopic,
                                @ApiParam(value = "Bookkeeper persistence 
policies for specified topic") PersistencePolicies persistencePolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetPersistence(persistencePolicies).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed updated persistence policies", ex);
@@ -1785,6 +1773,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                   @PathParam("namespace") String namespace,
                                   @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemovePersistence().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed updated retention", ex);
@@ -1812,6 +1801,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("namespace") String namespace,
                                 @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<Integer> maxSubscriptionsPerTopic = 
internalGetMaxSubscriptionsPerTopic();
             if (!maxSubscriptionsPerTopic.isPresent()) {
@@ -1841,6 +1831,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "The max subscriptions of 
the topic") int maxSubscriptionsPerTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, 
ex) -> {
             if (ex instanceof RestException) {
                 log.error("Updating maxSubscriptionsPerTopic failed", ex);
@@ -1870,6 +1861,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove maxSubscriptionsPerTopic", ex);
@@ -1896,6 +1888,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("namespace") String namespace,
                                 @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<Integer> maxProducers = internalGetMaxProducers();
             if (!maxProducers.isPresent()) {
@@ -1924,6 +1917,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "The max producers of the 
topic") int maxProducers) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetMaxProducers(maxProducers).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed updated persistence policies", ex);
@@ -1954,6 +1948,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveMaxProducers().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove maxProducers", ex);
@@ -1980,6 +1975,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("namespace") String namespace,
                                 @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<Integer> maxConsumers = internalGetMaxConsumers();
             if (!maxConsumers.isPresent()) {
@@ -2008,6 +2004,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "The max consumers of the 
topic") int maxConsumers) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetMaxConsumers(maxConsumers).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed updated persistence policies", ex);
@@ -2038,6 +2035,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveMaxConsumers().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove maxConsumers", ex);
@@ -2064,6 +2062,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                   @PathParam("namespace") String namespace,
                                   @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<Integer> policies = internalGetMaxMessageSize();
             if (policies.isPresent()) {
@@ -2092,6 +2091,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                   @PathParam("topic") @Encoded String 
encodedTopic,
                                   @ApiParam(value = "The max message size of 
the topic") int maxMessageSize) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed updated persistence policies", ex);
@@ -2122,6 +2122,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetMaxMessageSize(null).whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove maxMessageSize", ex);
@@ -2351,6 +2352,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<DispatchRate> dispatchRate = internalGetDispatchRate();
             if (!dispatchRate.isPresent()) {
@@ -2378,6 +2380,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "Dispatch rate for the 
specified topic") DispatchRate dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetDispatchRate(dispatchRate).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed to set topic dispatch rate", ex);
@@ -2411,6 +2414,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveDispatchRate().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove topic dispatch rate", ex);
@@ -2438,6 +2442,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<DispatchRate> dispatchRate = 
internalGetSubscriptionDispatchRate();
             if (!dispatchRate.isPresent()) {
@@ -2465,6 +2470,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "Subscription message 
dispatch rate for the specified topic") DispatchRate dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) 
-> {
             if (ex instanceof RestException) {
                 log.error("Failed to set topic: {} subscription dispatch 
rate", topicName.getLocalName(), ex);
@@ -2498,6 +2504,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove topic: {} subscription dispatch 
rate", topicName.getLocalName(), ex);
@@ -2525,6 +2532,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                        @PathParam("namespace") String 
namespace,
                                        @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<Long> compactionThreshold = 
internalGetCompactionThreshold();
             if (!compactionThreshold.isPresent()) {
@@ -2552,6 +2560,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "Dispatch rate for the 
specified topic") long compactionThreshold) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetCompactionThreshold(compactionThreshold).whenComplete((r, 
ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed to set topic dispatch rate", ex);
@@ -2585,6 +2594,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveCompactionThreshold().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove topic dispatch rate", ex);
@@ -2612,6 +2622,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                        @PathParam("namespace") String 
namespace,
                                        @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<Integer> maxConsumersPerSubscription = 
internalGetMaxConsumersPerSubscription();
             if (!maxConsumersPerSubscription.isPresent()) {
@@ -2639,6 +2650,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                        @PathParam("topic") @Encoded String 
encodedTopic,
                                        @ApiParam(value = "Dispatch rate for 
the specified topic") int maxConsumersPerSubscription) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         
internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription).whenComplete((r,
 ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed to set topic {} max consumers per 
subscription ", topicName.getLocalName(), ex);
@@ -2672,6 +2684,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                           @PathParam("namespace") String 
namespace,
                                           @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveMaxConsumersPerSubscription().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove topic {} max consuners per 
subscription", topicName.getLocalName(), ex);
@@ -2699,6 +2712,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<PublishRate> publishRate = internalGetPublishRate();
             if (!publishRate.isPresent()) {
@@ -2726,6 +2740,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "Dispatch rate for the 
specified topic") PublishRate publishRate) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetPublishRate(publishRate).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed to set topic dispatch rate", ex);
@@ -2759,6 +2774,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemovePublishRate().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove topic publish rate", ex);
@@ -2786,6 +2802,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("namespace") String namespace,
                                 @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         try {
             Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
             if (!subscribeRate.isPresent()) {
@@ -2813,6 +2830,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                 @PathParam("topic") @Encoded String 
encodedTopic,
                                 @ApiParam(value = "Subscribe rate for the 
specified topic") SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalSetSubscribeRate(subscribeRate).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed to set topic {} subscribe rate", 
topicName.getLocalName(), ex);
@@ -2846,6 +2864,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
+        preValidation();
         internalRemoveSubscribeRate().whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("Failed to remove topic {} subscribe rate ", 
topicName.getLocalName(), ex);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index f0c6a25..1e2b2d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -122,7 +123,9 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
-        mockPulsarSetup.cleanup();
+        if (mockPulsarSetup != null) {
+            mockPulsarSetup.cleanup();
+        }
     }
 
     @DataProvider(name = "topicType")
@@ -242,6 +245,70 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         consumer2.close();
     }
 
+    @Test
+    public void testTopicPoliciesWithMultiBroker() throws Exception {
+        //setup cluster with 3 broker
+        cleanup();
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        super.internalSetup();
+        admin.clusters().createCluster("test"
+                , new ClusterData(pulsar.getWebServiceAddress() + 
",localhost:1026," + "localhost:2050"));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("prop-xyz", tenantInfo);
+        admin.namespaces().createNamespace("prop-xyz/ns1", 
Sets.newHashSet("test"));
+        conf.setBrokerServicePort(Optional.of(1024));
+        conf.setBrokerServicePortTls(Optional.of(1025));
+        conf.setWebServicePort(Optional.of(1026));
+        conf.setWebServicePortTls(Optional.of(1027));
+        @Cleanup
+        PulsarService pulsar2 = startBrokerWithoutAuthorization(conf);
+        conf.setBrokerServicePort(Optional.of(2048));
+        conf.setBrokerServicePortTls(Optional.of(2049));
+        conf.setWebServicePort(Optional.of(2050));
+        conf.setWebServicePortTls(Optional.of(2051));
+        @Cleanup
+        PulsarService pulsar3 = startBrokerWithoutAuthorization(conf);
+        @Cleanup
+        PulsarAdmin admin2 = 
PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
+        @Cleanup
+        PulsarAdmin admin3 = 
PulsarAdmin.builder().serviceHttpUrl(pulsar3.getWebServiceAddress()).build();
+
+        //for partitioned topic, we can get topic policies from every broker
+        final String topic = "persistent://prop-xyz/ns1/test-" + 
UUID.randomUUID();
+        int partitionNum = 3;
+        admin.topics().createPartitionedTopic(topic, partitionNum);
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
+        TopicName topicName = TopicName.get(topic);
+        Awaitility.await().until(()-> 
pulsar.getTopicPoliciesService().cacheIsInitialized(topicName));
+
+        setTopicPoliciesAndValidate(admin2, admin3, topic);
+        //for non-partitioned topic, we can get topic policies from every 
broker
+        final String topic2 = "persistent://prop-xyz/ns1/test-" + 
UUID.randomUUID();
+        
pulsarClient.newConsumer().topic(topic2).subscriptionName("sub").subscribe().close();
+        setTopicPoliciesAndValidate(admin2, admin3, topic2);
+    }
+
+    private void setTopicPoliciesAndValidate(PulsarAdmin admin2
+            , PulsarAdmin admin3, String topic) throws Exception {
+        admin.topics().setMaxUnackedMessagesOnConsumer(topic, 100);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
+        admin.topics().setMaxConsumers(topic, 101);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.topics().getMaxConsumers(topic)));
+        admin.topics().setMaxProducers(topic, 102);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.topics().getMaxProducers(topic)));
+
+        
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 
100);
+        
assertEquals(admin2.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 
100);
+        
assertEquals(admin3.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 
100);
+        assertEquals(admin.topics().getMaxConsumers(topic).intValue(), 101);
+        assertEquals(admin2.topics().getMaxConsumers(topic).intValue(), 101);
+        assertEquals(admin3.topics().getMaxConsumers(topic).intValue(), 101);
+        assertEquals(admin.topics().getMaxProducers(topic).intValue(), 102);
+        assertEquals(admin2.topics().getMaxProducers(topic).intValue(), 102);
+        assertEquals(admin3.topics().getMaxProducers(topic).intValue(), 102);
+    }
+
     /**
      * verifies admin api command for non-persistent topic. It verifies: 
partitioned-topic, stats
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index adcaedf..1efc66c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -256,15 +256,21 @@ public abstract class MockedPulsarServiceBaseTest {
     }
 
     protected PulsarService startBroker(ServiceConfiguration conf) throws 
Exception {
-        PulsarService pulsar = spy(new PulsarService(conf));
 
-        setupBrokerMocks(pulsar);
         boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
         // enable authorization to initialize authorization service which is 
used by grant-permission
         conf.setAuthorizationEnabled(true);
-        pulsar.start();
+        PulsarService pulsar = startBrokerWithoutAuthorization(conf);
         conf.setAuthorizationEnabled(isAuthorizationEnabled);
+        return pulsar;
+    }
 
+    protected PulsarService 
startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
+        PulsarService pulsar = spy(new PulsarService(conf));
+        setupBrokerMocks(pulsar);
+        pulsar.start();
+        log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}", 
pulsar.getBrokerServiceUrl(),
+                pulsar.getWebServiceAddress());
         return pulsar;
     }
 
@@ -414,4 +420,4 @@ public abstract class MockedPulsarServiceBaseTest {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
-}
\ No newline at end of file
+}

Reply via email to