This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 32aa263693c6b95f9d41637efb6aba5097c73e70
Author: lipenghui <[email protected]>
AuthorDate: Fri Jul 30 09:57:44 2021 +0800

    Add backoff for setting for getting topic policies. (#11487)
    
    Currently, if we start a new broker which does not owned any namepsaces 
bundles.
    Then when use the pulsar-admin to setting or getting topic policies, we 
will get
    `topic policies have not been initialized yet` error log and the admin 
operation will
    get failed.
    
    The root cause is we are failed immediately without any retry while the 
topic polices
    cache have not init yet. So the PR to introduce the backoff for setting or 
getting
    the topic policy if encounter the topic policies cache not init exception
    
    Remove the cache init check for the tests.
    
    (cherry picked from commit bebaadf2087019dc187fd7a91f491dee1fca034d)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  47 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 681 +++++++++++----------
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  70 +--
 .../pulsar/broker/service/BacklogQuotaManager.java |  10 +-
 .../SystemTopicBasedTopicPoliciesService.java      |   6 -
 .../broker/service/TopicPoliciesService.java       |  12 -
 .../broker/admin/AdminApiDelayedDelivery.java      |   2 -
 .../broker/admin/AdminApiMaxUnackedMessages.java   |   2 -
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |   9 -
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |   1 -
 .../broker/admin/MaxUnackedMessagesTest.java       |   2 -
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 135 +---
 .../broker/service/InactiveTopicDeleteTest.java    |  10 -
 .../broker/service/ReplicatorRateLimiterTest.java  |   2 -
 .../pulsar/broker/service/ReplicatorTest.java      |   6 -
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |   2 -
 .../service/persistent/DelayedDeliveryTest.java    |   4 -
 .../service/persistent/TopicDuplicationTest.java   |   2 -
 .../apache/pulsar/client/impl/BackoffBuilder.java  |   3 +-
 19 files changed, 438 insertions(+), 568 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 530b350..8f1e780 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -29,6 +29,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
@@ -43,6 +46,8 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -80,6 +85,7 @@ public abstract class AdminResource extends PulsarWebResource 
{
     public static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
     public static final String PARTITIONED_TOPIC_PATH_ZNODE = 
"partitioned-topics";
     private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";
+    private static final long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
 
     protected BookKeeper bookKeeper() {
         return pulsar().getBookKeeperClient();
@@ -363,19 +369,46 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return 
pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, 
namespacePath);
     }
 
-    protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
+    protected CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName) {
+        return internalGetTopicPoliciesAsyncWithRetry(topicName,
+                new AtomicLong(DEFAULT_GET_TOPIC_POLICY_TIMEOUT), null, null);
+    }
+
+    protected CompletableFuture<Optional<TopicPolicies>> 
internalGetTopicPoliciesAsyncWithRetry(TopicName topicName,
+            final AtomicLong remainingTime, final Backoff backoff, 
CompletableFuture<Optional<TopicPolicies>> future) {
+        CompletableFuture<Optional<TopicPolicies>> response = future == null ? 
new CompletableFuture<>() : future;
         try {
             checkTopicLevelPolicyEnable();
-            return 
Optional.ofNullable(pulsar().getTopicPoliciesService().getTopicPolicies(topicName));
+            response.complete(Optional.ofNullable(pulsar()
+                    .getTopicPoliciesService().getTopicPolicies(topicName)));
         } catch (RestException re) {
-            throw re;
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
-            log.error("Topic {} policies have not been initialized yet.", 
topicName);
-            throw new RestException(e);
+            response.completeExceptionally(re);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            Backoff usedBackoff = backoff == null ? new BackoffBuilder()
+                    .setInitialTime(500, TimeUnit.MILLISECONDS)
+                    .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, 
TimeUnit.MILLISECONDS)
+                    .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, 
TimeUnit.MILLISECONDS)
+                    .create() : backoff;
+            long nextDelay = Math.min(usedBackoff.next(), remainingTime.get());
+            if (nextDelay <= 0) {
+                response.completeExceptionally(new TimeoutException(
+                        String.format("Failed to get topic policy withing 
configured timeout %s ms",
+                                DEFAULT_GET_TOPIC_POLICY_TIMEOUT)));
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.error("Topic {} policies have not been initialized 
yet, retry after {}ms",
+                            topicName, nextDelay);
+                }
+                pulsar().getExecutor().schedule(() -> {
+                    remainingTime.addAndGet(-nextDelay);
+                    internalGetTopicPoliciesAsyncWithRetry(topicName, 
remainingTime, usedBackoff, response);
+                }, nextDelay, TimeUnit.MILLISECONDS);
+            }
         } catch (Exception e) {
             log.error("[{}] Failed to get topic policies {}", clientAppId(), 
topicName, e);
-            throw new RestException(e);
+            response.completeExceptionally(e);
         }
+        return response;
     }
 
     protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies 
retention) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c1f6800..3d78116 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -504,16 +504,14 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> 
internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) {
-        TopicPolicies topicPolicies;
-        try {
-            topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-            topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? 
null : deliveryPolicies.isActive());
-            topicPolicies.setDelayedDeliveryTickTimeMillis(
-                    deliveryPolicies == null ? null : 
deliveryPolicies.getTickTime());
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == 
null ? null : deliveryPolicies.isActive());
+                topicPolicies.setDelayedDeliveryTickTimeMillis(
+                        deliveryPolicies == null ? null : 
deliveryPolicies.getTickTime());
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     private CompletableFuture<Void> updatePartitionInOtherCluster(int 
numPartitions, Set<String> clusters) {
@@ -744,42 +742,41 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<DelayedDeliveryPolicies> 
internalGetDelayedDeliveryPolicies(boolean applied) {
-        TopicPolicies policies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        DelayedDeliveryPolicies delayedDeliveryPolicies = null;
-        if (policies.isDelayedDeliveryEnabledSet() && 
policies.isDelayedDeliveryTickTimeMillisSet()) {
-            delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
-                    .tickTime(policies.getDelayedDeliveryTickTimeMillis())
-                    .active(policies.getDelayedDeliveryEnabled())
-                    .build();
-        }
-        if (delayedDeliveryPolicies == null && applied) {
-            delayedDeliveryPolicies = 
getNamespacePolicies(namespaceName).delayed_delivery_policies;
-            if (delayedDeliveryPolicies == null) {
-                delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
-                        
.tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis())
-                        
.active(pulsar().getConfiguration().isDelayedDeliveryEnabled())
-                        .build();
-            }
-        }
-        return CompletableFuture.completedFuture(delayedDeliveryPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> {
+                TopicPolicies policies = op.orElseGet(TopicPolicies::new);
+                DelayedDeliveryPolicies delayedDeliveryPolicies = null;
+                if (policies.isDelayedDeliveryEnabledSet() && 
policies.isDelayedDeliveryTickTimeMillisSet()) {
+                    delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
+                            
.tickTime(policies.getDelayedDeliveryTickTimeMillis())
+                            .active(policies.getDelayedDeliveryEnabled())
+                            .build();
+                }
+                if (delayedDeliveryPolicies == null && applied) {
+                    delayedDeliveryPolicies = 
getNamespacePolicies(namespaceName).delayed_delivery_policies;
+                    if (delayedDeliveryPolicies == null) {
+                        delayedDeliveryPolicies = 
DelayedDeliveryPolicies.builder()
+                                
.tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis())
+                                
.active(pulsar().getConfiguration().isDelayedDeliveryEnabled())
+                                .build();
+                    }
+                }
+                return delayedDeliveryPolicies;
+            });
     }
 
     protected CompletableFuture<OffloadPoliciesImpl> 
internalGetOffloadPolicies(boolean applied) {
-        CompletableFuture<OffloadPoliciesImpl> res = new CompletableFuture<>();
-        try {
-            OffloadPoliciesImpl offloadPolicies =
-                    
getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null);
-            if (applied) {
-                OffloadPoliciesImpl namespacePolicy =
-                        (OffloadPoliciesImpl) 
getNamespacePolicies(namespaceName).offload_policies;
-                offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
-                        , namespacePolicy, 
pulsar().getConfiguration().getProperties());
-            }
-            res.complete(offloadPolicies);
-        } catch (Exception e) {
-            res.completeExceptionally(e);
-        }
-        return res;
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> {
+                OffloadPoliciesImpl offloadPolicies = 
op.map(TopicPolicies::getOffloadPolicies).orElse(null);
+                if (applied) {
+                    OffloadPoliciesImpl namespacePolicy =
+                            (OffloadPoliciesImpl) 
getNamespacePolicies(namespaceName).offload_policies;
+                    offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
+                            , namespacePolicy, 
pulsar().getConfiguration().getProperties());
+                }
+                return offloadPolicies;
+            });
     }
 
     protected CompletableFuture<Void> 
internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
@@ -821,8 +818,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<InactiveTopicPolicies> 
internalGetInactiveTopicPolicies(boolean applied) {
-        InactiveTopicPolicies inactiveTopicPolicies = 
getTopicPolicies(topicName)
-                .map(TopicPolicies::getInactiveTopicPolicies)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies)
                 .orElseGet(() -> {
                     if (applied) {
                         InactiveTopicPolicies policies = 
getNamespacePolicies(namespaceName).inactive_topic_policies;
@@ -832,8 +829,7 @@ public class PersistentTopicsBase extends AdminResource {
                                 
config().isBrokerDeleteInactiveTopicsEnabled()) : policies;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(inactiveTopicPolicies);
+                }));
     }
 
     protected CompletableFuture<Void> 
internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
@@ -882,8 +878,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Integer> 
internalGetMaxUnackedMessagesOnSubscription(boolean applied) {
-        Integer maxNum = getTopicPolicies(topicName)
-                .map(TopicPolicies::getMaxUnackedMessagesOnSubscription)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> 
op.map(TopicPolicies::getMaxUnackedMessagesOnSubscription)
                 .orElseGet(() -> {
                     if (applied) {
                         Integer maxUnackedNum = 
getNamespacePolicies(namespaceName)
@@ -891,8 +887,7 @@ public class PersistentTopicsBase extends AdminResource {
                         return maxUnackedNum == null ? 
config().getMaxUnackedMessagesPerSubscription() : maxUnackedNum;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(maxNum);
+                }));
     }
 
     protected CompletableFuture<Void> 
internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
@@ -916,16 +911,15 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Integer> 
internalGetMaxUnackedMessagesOnConsumer(boolean applied) {
-        Integer maxNum = getTopicPolicies(topicName)
-                .map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> 
op.map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
                 .orElseGet(() -> {
                     if (applied) {
                         Integer maxUnacked = 
getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
                         return maxUnacked == null ? 
config().getMaxUnackedMessagesPerConsumer() : maxUnacked;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(maxNum);
+                }));
     }
 
     protected CompletableFuture<Void> 
internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
@@ -953,10 +947,12 @@ public class PersistentTopicsBase extends AdminResource {
         if (interval != null && interval < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "interval must 
be 0 or more");
         }
-        TopicPolicies policies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        policies.setDeduplicationSnapshotIntervalSeconds(interval);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
policies);
-
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies policies = op.orElseGet(TopicPolicies::new);
+                policies.setDeduplicationSnapshotIntervalSeconds(interval);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
policies);
+            });
     }
 
     private void internalUnloadNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
@@ -2650,80 +2646,81 @@ public class PersistentTopicsBase extends AdminResource 
{
         return offlineTopicStats;
     }
 
-    protected Map<BacklogQuota.BacklogQuotaType, BacklogQuota> 
internalGetBacklogQuota(boolean applied) {
-        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaMap = 
getTopicPolicies(topicName)
-                .map(TopicPolicies::getBackLogQuotaMap)
-                .map(map -> {
-                    HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> 
hashMap = Maps.newHashMap();
-                    map.forEach((key, value) -> 
hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key), value));
-                    return hashMap;
-                }).orElse(Maps.newHashMap());
-        if (applied && quotaMap.isEmpty()) {
-            quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map;
-            if (quotaMap.isEmpty()) {
-                String namespace = namespaceName.toString();
-                quotaMap.put(
-                        BacklogQuota.BacklogQuotaType.destination_storage,
-                        namespaceBacklogQuota(namespace, 
AdminResource.path(POLICIES, namespace))
-                );
-
-            }
-        }
-        return quotaMap;
+    protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, 
BacklogQuota>> internalGetBacklogQuota(
+            boolean applied) {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> {
+                Map<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaMap = op
+                        .map(TopicPolicies::getBackLogQuotaMap)
+                        .map(map -> {
+                            HashMap<BacklogQuota.BacklogQuotaType, 
BacklogQuota> hashMap = Maps.newHashMap();
+                            map.forEach((key, value) -> 
hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key), value));
+                            return hashMap;
+                        }).orElse(Maps.newHashMap());
+                if (applied && quotaMap.isEmpty()) {
+                    quotaMap = 
getNamespacePolicies(namespaceName).backlog_quota_map;
+                    if (quotaMap.isEmpty()) {
+                        String namespace = namespaceName.toString();
+                        quotaMap.put(
+                                
BacklogQuota.BacklogQuotaType.destination_storage,
+                                namespaceBacklogQuota(namespace, 
AdminResource.path(POLICIES, namespace))
+                        );
+                    }
+                }
+                return quotaMap;
+            });
     }
 
     protected CompletableFuture<Void> 
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
                                            BacklogQuotaImpl backlogQuota) {
         validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, 
PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
-        TopicPolicies topicPolicies;
-        if (backlogQuotaType == null) {
-            backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage;
-        }
-        try {
-            topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
 
-        RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, 
topicPolicies);
-        if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
-            log.warn(
-                    "[{}] Failed to update backlog configuration for topic {}: 
conflicts with retention quota",
-                    clientAppId(), topicName);
-            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
-                    "Backlog Quota exceeds configured retention quota for 
topic. "
-                            + "Please increase retention quota and retry"));
-        }
+        BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType 
== null
+                ? BacklogQuota.BacklogQuotaType.destination_storage : 
backlogQuotaType;
+
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                RetentionPolicies retentionPolicies = 
getRetentionPolicies(topicName, topicPolicies);
+                if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
+                    log.warn(
+                            "[{}] Failed to update backlog configuration for 
topic {}: conflicts with retention quota",
+                            clientAppId(), topicName);
+                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Backlog Quota exceeds configured retention quota 
for topic. "
+                                    + "Please increase retention quota and 
retry"));
+                }
 
-        if (backlogQuota != null) {
-            topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), 
backlogQuota);
-        } else {
-            topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
-        }
-        Map<String, BacklogQuotaImpl> backLogQuotaMap = 
topicPolicies.getBackLogQuotaMap();
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies).thenRun(() -> {
-            try {
-                log.info("[{}] Successfully updated backlog quota map: 
namespace={}, topic={}, map={}",
-                        clientAppId(),
-                        namespaceName,
-                        topicName.getLocalName(),
-                        jsonMapper().writeValueAsString(backLogQuotaMap));
-            } catch (JsonProcessingException ignore) { }
-        });
+                if (backlogQuota != null) {
+                    
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), 
backlogQuota);
+                } else {
+                    
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
+                }
+                Map<String, BacklogQuotaImpl> backLogQuotaMap = 
topicPolicies.getBackLogQuotaMap();
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                    .thenRun(() -> {
+                        try {
+                            log.info("[{}] Successfully updated backlog quota 
map: namespace={}, topic={}, map={}",
+                                    clientAppId(),
+                                    namespaceName,
+                                    topicName.getLocalName(),
+                                    
jsonMapper().writeValueAsString(backLogQuotaMap));
+                        } catch (JsonProcessingException ignore) { }
+                });
+            });
     }
 
     protected CompletableFuture<Boolean> internalGetDeduplication(boolean 
applied) {
-        Boolean deduplicationEnabled = getTopicPolicies(topicName)
-                .map(TopicPolicies::getDeduplicationEnabled)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled)
                 .orElseGet(() -> {
                     if (applied) {
                         Boolean enabled = 
getNamespacePolicies(namespaceName).deduplicationEnabled;
                         return enabled == null ? 
config().isBrokerDeduplicationEnabled() : enabled;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(deduplicationEnabled);
+                }));
     }
 
     protected CompletableFuture<Void> internalSetDeduplication(Boolean 
enabled) {
@@ -2747,20 +2744,16 @@ public class PersistentTopicsBase extends AdminResource 
{
             return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
                     "Invalid value for message TTL"));
         }
-        TopicPolicies topicPolicies;
-        try {
-            topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-        topicPolicies.setMessageTTLInSeconds(ttlInSecond);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies).thenRun(() -> {
-            log.info("[{}] Successfully set topic message ttl: namespace={}, 
topic={}, ttl={}",
-                    clientAppId(),
-                    namespaceName,
-                    topicName.getLocalName(),
-                    ttlInSecond);
-        });
+
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setMessageTTLInSeconds(ttlInSecond);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                        .thenRun(() ->
+                                log.info("[{}] Successfully set topic message 
ttl: namespace={}, topic={}, ttl={}",
+                                        clientAppId(), namespaceName, 
topicName.getLocalName(), ttlInSecond));
+            });
     }
 
     private RetentionPolicies getRetentionPolicies(TopicName topicName, 
TopicPolicies topicPolicies) {
@@ -2777,92 +2770,96 @@ public class PersistentTopicsBase extends AdminResource 
{
         return retentionPolicies;
     }
 
-    protected RetentionPolicies internalGetRetention(boolean applied) {
-        return getTopicPolicies(topicName)
-                .map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
-                    if (applied) {
-                        RetentionPolicies policies = 
getNamespacePolicies(namespaceName).retention_policies;
-                        return policies == null ? new RetentionPolicies(
-                                config().getDefaultRetentionTimeInMinutes(), 
config().getDefaultRetentionSizeInMB())
-                                : policies;
-                    }
-                    return null;
-                });
+    protected CompletableFuture<RetentionPolicies> 
internalGetRetention(boolean applied) {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> 
op.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
+                if (applied) {
+                    RetentionPolicies policies = 
getNamespacePolicies(namespaceName).retention_policies;
+                    return policies == null ? new RetentionPolicies(
+                            config().getDefaultRetentionTimeInMinutes(), 
config().getDefaultRetentionSizeInMB())
+                            : policies;
+                }
+                return null;
+            }));
     }
 
     protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retention) {
         if (retention == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies;
-        try {
-            topicPolicies = getTopicPolicies(topicName)
-                    .orElseGet(TopicPolicies::new);
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-        BacklogQuota backlogQuota =
-                    
topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
-        if (backlogQuota == null) {
-            Policies policies = 
getNamespacePolicies(topicName.getNamespaceObject());
-            backlogQuota = 
policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
-        }
-        if (!checkBacklogQuota(backlogQuota, retention)) {
-            log.warn(
-                    "[{}] Failed to update retention quota configuration for 
topic {}: conflicts with retention quota",
-                    clientAppId(), topicName);
-            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
-                    "Retention Quota must exceed configured backlog quota for 
topic. "
-                            + "Please increase retention quota and retry"));
-        }
-        topicPolicies.setRetentionPolicies(retention);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                BacklogQuota backlogQuota =
+                        topicPolicies.getBackLogQuotaMap()
+                                
.get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+                if (backlogQuota == null) {
+                    Policies policies = 
getNamespacePolicies(topicName.getNamespaceObject());
+                    backlogQuota = 
policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+                }
+                if (!checkBacklogQuota(backlogQuota, retention)) {
+                    log.warn(
+                            "[{}] Failed to update retention quota 
configuration for topic {}: "
+                                    + "conflicts with retention quota",
+                            clientAppId(), topicName);
+                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Retention Quota must exceed configured backlog 
quota for topic. "
+                                    + "Please increase retention quota and 
retry"));
+                }
+                topicPolicies.setRetentionPolicies(retention);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalRemoveRetention() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setRetentionPolicies(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setRetentionPolicies(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
     protected CompletableFuture<PersistencePolicies> 
internalGetPersistence(boolean applied) {
-        PersistencePolicies persistencePolicies = getTopicPolicies(topicName)
-                .map(TopicPolicies::getPersistence)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getPersistence)
                 .orElseGet(() -> {
                     if (applied) {
                         PersistencePolicies namespacePolicy = 
getNamespacePolicies(namespaceName)
                                 .persistence;
                         return namespacePolicy == null
                                 ? new PersistencePolicies(
-                                        
pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(),
-                                        
pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(),
-                                        
pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(),
-                                        
pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit())
+                                
pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(),
+                                
pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(),
+                                
pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(),
+                                
pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit())
                                 : namespacePolicy;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(persistencePolicies);
+                }));
     }
 
     protected CompletableFuture<Void> 
internalSetPersistence(PersistencePolicies persistencePolicies) {
         validatePersistencePolicies(persistencePolicies);
-
-        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setPersistence(persistencePolicies);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setPersistence(persistencePolicies);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalRemovePersistence() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setPersistence(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setPersistence(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
     protected CompletableFuture<Void> internalSetMaxMessageSize(Integer 
maxMessageSize) {
@@ -2872,26 +2869,29 @@ public class PersistentTopicsBase extends AdminResource 
{
                     + "and must be smaller than that in the broker-level");
         }
 
-        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxMessageSize(maxMessageSize);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setMaxMessageSize(maxMessageSize);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
-    protected Optional<Integer> internalGetMaxMessageSize() {
-        return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
+    protected CompletableFuture<Optional<Integer>> internalGetMaxMessageSize() 
{
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getMaxMessageSize));
     }
 
     protected CompletableFuture<Integer> internalGetMaxProducers(boolean 
applied) {
-        Integer maxNum = getTopicPolicies(topicName)
-                .map(TopicPolicies::getMaxProducerPerTopic)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getMaxProducerPerTopic)
                 .orElseGet(() -> {
                     if (applied) {
                         Integer maxProducer = 
getNamespacePolicies(namespaceName).max_producers_per_topic;
                         return maxProducer == null ? 
config().getMaxProducersPerTopic() : maxProducer;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(maxNum);
+                }));
     }
 
     protected CompletableFuture<Void> internalSetMaxProducers(Integer 
maxProducers) {
@@ -2899,13 +2899,18 @@ public class PersistentTopicsBase extends AdminResource 
{
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxProducers must be 0 or more");
         }
-        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxProducerPerTopic(maxProducers);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setMaxProducerPerTopic(maxProducers);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
+
     }
 
-    protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() {
-        return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic);
+    protected CompletableFuture<Optional<Integer>> 
internalGetMaxSubscriptionsPerTopic() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> 
op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
     }
 
     protected CompletableFuture<Void> 
internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
@@ -2914,14 +2919,17 @@ public class PersistentTopicsBase extends AdminResource 
{
                     "maxSubscriptionsPerTopic must be 0 or more");
         }
 
-        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                
topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<DispatchRateImpl> 
internalGetReplicatorDispatchRate(boolean applied) {
-        DispatchRateImpl dispatchRate = getTopicPolicies(topicName)
-                .map(TopicPolicies::getReplicatorDispatchRate)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate)
                 .orElseGet(() -> {
                     if (applied) {
                         DispatchRateImpl namespacePolicy = 
getNamespacePolicies(namespaceName)
@@ -2929,14 +2937,16 @@ public class PersistentTopicsBase extends AdminResource 
{
                         return namespacePolicy == null ? 
replicatorDispatchRate() : namespacePolicy;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(dispatchRate);
+                }));
     }
 
     protected CompletableFuture<Void> 
internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) {
-        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setReplicatorDispatchRate(dispatchRate);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setReplicatorDispatchRate(dispatchRate);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> preValidation(boolean authoritative) {
@@ -2966,25 +2976,26 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<Void> internalRemoveMaxProducers() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setMaxProducerPerTopic(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setMaxProducerPerTopic(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
     protected CompletableFuture<Integer> internalGetMaxConsumers(boolean 
applied) {
-        Integer maxNum = getTopicPolicies(topicName)
-                .map(TopicPolicies::getMaxConsumerPerTopic)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getMaxConsumerPerTopic)
                 .orElseGet(() -> {
                     if (applied) {
                         Integer maxConsumer = 
getNamespacePolicies(namespaceName).max_consumers_per_topic;
                         return maxConsumer == null ? 
config().getMaxConsumersPerTopic() : maxConsumer;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(maxNum);
+                }));
     }
 
     protected CompletableFuture<Void> internalSetMaxConsumers(Integer 
maxConsumers) {
@@ -2992,19 +3003,24 @@ public class PersistentTopicsBase extends AdminResource 
{
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxConsumers must be 0 or more");
         }
-
-        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxConsumerPerTopic(maxConsumers);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setMaxConsumerPerTopic(maxConsumers);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalRemoveMaxConsumers() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setMaxConsumerPerTopic(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setMaxConsumerPerTopic(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
+
     }
 
     protected MessageId internalTerminate(boolean authoritative) {
@@ -3862,8 +3878,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<DispatchRateImpl> 
internalGetDispatchRate(boolean applied) {
-        DispatchRateImpl dispatchRate = getTopicPolicies(topicName)
-                .map(TopicPolicies::getDispatchRate)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getDispatchRate)
                 .orElseGet(() -> {
                     if (applied) {
                         DispatchRateImpl namespacePolicy = 
getNamespacePolicies(namespaceName)
@@ -3871,33 +3887,35 @@ public class PersistentTopicsBase extends AdminResource 
{
                         return namespacePolicy == null ? dispatchRate() : 
namespacePolicy;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(dispatchRate);
+                }));
     }
 
     protected CompletableFuture<Void> internalSetDispatchRate(DispatchRateImpl 
dispatchRate) {
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setDispatchRate(dispatchRate);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setDispatchRate(dispatchRate);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalRemoveDispatchRate() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setDispatchRate(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
-
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setDispatchRate(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
     protected CompletableFuture<DispatchRate> 
internalGetSubscriptionDispatchRate(boolean applied) {
-        DispatchRate dispatchRate = getTopicPolicies(topicName)
-                .map(TopicPolicies::getSubscriptionDispatchRate)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate)
                 .orElseGet(() -> {
                     if (applied) {
                         DispatchRateImpl namespacePolicy = 
getNamespacePolicies(namespaceName)
@@ -3905,57 +3923,64 @@ public class PersistentTopicsBase extends AdminResource 
{
                         return namespacePolicy == null ? 
subscriptionDispatchRate() : namespacePolicy;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(dispatchRate);
+                }));
     }
 
     protected CompletableFuture<Void> 
internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) {
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setSubscriptionDispatchRate(dispatchRate);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setSubscriptionDispatchRate(dispatchRate);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() 
{
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setSubscriptionDispatchRate(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setSubscriptionDispatchRate(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
 
-    protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
-        return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
+    protected CompletableFuture<Optional<Integer>> 
internalGetMaxConsumersPerSubscription() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> 
op.map(TopicPolicies::getMaxConsumersPerSubscription));
     }
 
     protected CompletableFuture<Void> 
internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
         if (maxConsumersPerSubscription != null && maxConsumersPerSubscription 
< 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for maxConsumersPerSubscription");
         }
-
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-                .orElseGet(TopicPolicies::new);
-        
topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                
topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> 
internalRemoveMaxConsumersPerSubscription() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setMaxConsumersPerSubscription(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setMaxConsumersPerSubscription(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
     protected CompletableFuture<Long> internalGetCompactionThreshold(boolean 
applied) {
-        Long threshold = getTopicPolicies(topicName)
-                .map(TopicPolicies::getCompactionThreshold)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getCompactionThreshold)
                 .orElseGet(() -> {
                     if (applied) {
                         Long namespacePolicy = 
getNamespacePolicies(namespaceName).compaction_threshold;
@@ -3964,8 +3989,7 @@ public class PersistentTopicsBase extends AdminResource {
                                 : namespacePolicy;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(threshold);
+                }));
     }
 
     protected CompletableFuture<Void> internalSetCompactionThreshold(Long 
compactionThreshold) {
@@ -3973,62 +3997,74 @@ public class PersistentTopicsBase extends AdminResource 
{
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for compactionThreshold");
         }
 
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setCompactionThreshold(compactionThreshold);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setCompactionThreshold(compactionThreshold);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
+
     }
 
     protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-          return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setCompactionThreshold(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setCompactionThreshold(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
-    protected Optional<PublishRate> internalGetPublishRate() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
-
+    protected CompletableFuture<Optional<PublishRate>> 
internalGetPublishRate() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getPublishRate));
     }
 
     protected CompletableFuture<Void> internalSetPublishRate(PublishRate 
publishRate) {
         if (publishRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setPublishRate(publishRate);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setPublishRate(publishRate);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
-    protected Optional<List<SubType>> internalGetSubscriptionTypesEnabled() {
-        return 
getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionTypesEnabled);
+    protected CompletableFuture<Optional<List<SubType>>> 
internalGetSubscriptionTypesEnabled() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> 
op.map(TopicPolicies::getSubscriptionTypesEnabled));
     }
 
     protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled(
             Set<SubscriptionType> subscriptionTypesEnabled) {
         List<SubType> subTypes = Lists.newArrayList();
         subscriptionTypesEnabled.forEach(subscriptionType -> 
subTypes.add(SubType.valueOf(subscriptionType.name())));
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-                .orElseGet(TopicPolicies::new);
-        topicPolicies.setSubscriptionTypesEnabled(subTypes);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setSubscriptionTypesEnabled(subTypes);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalRemovePublishRate() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setPublishRate(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setPublishRate(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
     protected CompletableFuture<SubscribeRate> 
internalGetSubscribeRate(boolean applied) {
-        SubscribeRate subscribeRate = getTopicPolicies(topicName)
-                .map(TopicPolicies::getSubscribeRate)
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenApply(op -> op.map(TopicPolicies::getSubscribeRate)
                 .orElseGet(() -> {
                     if (applied) {
                         SubscribeRate namespacePolicy = 
getNamespacePolicies(namespaceName)
@@ -4036,27 +4072,30 @@ public class PersistentTopicsBase extends AdminResource 
{
                         return namespacePolicy == null ? subscribeRate() : 
namespacePolicy;
                     }
                     return null;
-                });
-        return CompletableFuture.completedFuture(subscribeRate);
+                }));
     }
 
     protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate 
subscribeRate) {
         if (subscribeRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-                .orElseGet(TopicPolicies::new);
-        topicPolicies.setSubscribeRate(subscribeRate);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setSubscribeRate(subscribeRate);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalRemoveSubscribeRate() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setSubscribeRate(null);
-        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setSubscribeRate(null);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
     }
 
     protected void internalHandleResult(AsyncResponse asyncResponse,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 20e3f75..a0699a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -27,7 +27,6 @@ import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -52,7 +51,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
-import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -417,8 +415,9 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElse(new TopicPolicies());
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+            .thenAccept(op -> {
+                TopicPolicies topicPolicies = op.orElse(new TopicPolicies());
                 
asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
             })
             .exceptionally(ex -> {
@@ -1551,7 +1550,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenAccept(__ -> 
asyncResponse.resume(internalGetBacklogQuota(applied)))
+            .thenCompose(__ -> internalGetBacklogQuota(applied))
+            .thenAccept(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getBacklogQuotaMap", ex, 
asyncResponse);
                 return null;
@@ -1625,18 +1625,17 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenAccept(__ ->
-                asyncResponse.resume(getTopicPolicies(topicName)
-                    .map(TopicPolicies::getMessageTTLInSeconds)
-                    .orElseGet(() -> {
-                        if (applied) {
-                            Integer otherLevelTTL = 
getNamespacePolicies(namespaceName).message_ttl_in_seconds;
-                            return otherLevelTTL == null ? 
pulsar().getConfiguration().getTtlDurationDefaultInSeconds()
-                                    : otherLevelTTL;
-                        }
-                        return null;
-                    }))
-            )
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+            .thenAccept(op -> asyncResponse.resume(op
+                .map(TopicPolicies::getMessageTTLInSeconds)
+                .orElseGet(() -> {
+                    if (applied) {
+                        Integer otherLevelTTL = 
getNamespacePolicies(namespaceName).message_ttl_in_seconds;
+                        return otherLevelTTL == null ? 
pulsar().getConfiguration().getTtlDurationDefaultInSeconds()
+                                : otherLevelTTL;
+                    }
+                    return null;
+                })))
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMessageTTL", ex, asyncResponse);
                 return null;
@@ -1788,7 +1787,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> asyncResponse.resume(internalGetRetention(applied)))
+            .thenCompose(__ -> internalGetRetention(applied))
+            .thenAccept(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getRetention", ex, asyncResponse);
                 return null;
@@ -1971,11 +1971,9 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> maxSubscriptionsPerTopic = 
internalGetMaxSubscriptionsPerTopic();
-                asyncResponse.resume(maxSubscriptionsPerTopic.isPresent() ? 
maxSubscriptionsPerTopic.get()
-                        : Response.noContent().build());
-            })
+            .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic())
+            .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
+                    : Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxSubscriptions", ex, 
asyncResponse);
                 return null;
@@ -2317,8 +2315,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> policies = internalGetMaxMessageSize();
+            .thenCompose(__ -> internalGetMaxMessageSize())
+            .thenAccept(policies -> {
                 asyncResponse.resume(policies.isPresent() ? policies.get() : 
Response.noContent().build());
             })
             .exceptionally(ex -> {
@@ -2887,11 +2885,9 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> maxConsumersPerSubscription = 
internalGetMaxConsumersPerSubscription();
-                asyncResponse.resume(maxConsumersPerSubscription.isPresent() ? 
maxConsumersPerSubscription.get()
-                        : Response.noContent().build());
-            })
+            .thenCompose(__ -> internalGetMaxConsumersPerSubscription())
+            .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
+                    : Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxConsumersPerSubscription", 
ex, asyncResponse);
                 return null;
@@ -2983,11 +2979,9 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<PublishRate> publishRate = internalGetPublishRate();
-                asyncResponse.resume(publishRate.isPresent() ? 
publishRate.get()
-                        : Response.noContent().build());
-            })
+            .thenCompose(__ -> internalGetPublishRate())
+            .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
+                    : Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("getPublishRate", ex, 
asyncResponse);
                 return null;
@@ -3077,9 +3071,9 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<List<SubType>> subscriptionTypesEnabled = 
internalGetSubscriptionTypesEnabled();
-                asyncResponse.resume(subscriptionTypesEnabled.isPresent() ? 
subscriptionTypesEnabled.get()
+            .thenCompose(__ -> internalGetSubscriptionTypesEnabled())
+            .thenAccept(op -> {
+                asyncResponse.resume(op.isPresent() ? op.get()
                         : Response.noContent().build());
             })
             .exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 21792b3..c941b6c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -85,12 +85,10 @@ public class BacklogQuotaManager {
         }
 
         try {
-            if 
(pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)) {
-                return 
Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
-                        .map(TopicPolicies::getBackLogQuotaMap)
-                        .map(map -> 
map.get(BacklogQuotaType.destination_storage.name()))
-                        .orElseGet(() -> 
getBacklogQuota(topicName.getNamespace(), policyPath));
-            }
+            return 
Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
+                    .map(TopicPolicies::getBackLogQuotaMap)
+                    .map(map -> 
map.get(BacklogQuotaType.destination_storage.name()))
+                    .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), 
policyPath));
         } catch (Exception e) {
             log.warn("Failed to read topic policies data, will apply the 
namespace backlog quota: topicName={}",
                     topicName, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index ab4a521..53bc099 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -148,12 +148,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @Override
-    public boolean cacheIsInitialized(TopicName topicName) {
-        return policyCacheInitMap.containsKey(topicName.getNamespaceObject())
-                && policyCacheInitMap.get(topicName.getNamespaceObject());
-    }
-
-    @Override
     public TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException {
         if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 1086b45..2f42b64 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -80,13 +80,6 @@ public interface TopicPoliciesService {
      */
     void start();
 
-    /**
-     * whether the cache has been initialized.
-     * @param topicName
-     * @return
-     */
-    boolean cacheIsInitialized(TopicName topicName);
-
     void registerListener(TopicName topicName, 
TopicPolicyListener<TopicPolicies> listener);
 
     void unregisterListener(TopicName topicName, 
TopicPolicyListener<TopicPolicies> listener);
@@ -139,11 +132,6 @@ public interface TopicPoliciesService {
         }
 
         @Override
-        public boolean cacheIsInitialized(TopicName topicName) {
-            return false;
-        }
-
-        @Override
         public void registerListener(TopicName topicName, 
TopicPolicyListener<TopicPolicies> listener) {
             //No-op
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
index 3f010fa..71da565 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
@@ -150,8 +150,6 @@ public class AdminApiDelayedDelivery extends 
MockedPulsarServiceBaseTest {
         final String topic = "persistent://" + namespace + "/test" + 
UUID.randomUUID();
         admin.namespaces().createNamespace(namespace);
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         //namespace-level default value is null
         assertNull(admin.namespaces().getDelayedDelivery(namespace));
         //topic-level default value is null
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java
index 91a8f44..7d6010e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java
@@ -145,8 +145,6 @@ public class AdminApiMaxUnackedMessages extends 
MockedPulsarServiceBaseTest {
         for (int i = 0; i < 50; i++) {
             producer.send("msg".getBytes());
         }
-        Awaitility.await().until(()
-                -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         
assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace));
         assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic));
         admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 
namespaceLevelPolicy);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 2894943..9697459 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -192,8 +192,6 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         final String topicName = testTopic + UUID.randomUUID().toString();
         admin.topics().createPartitionedTopic(topicName, 3);
         pulsarClient.newProducer().topic(topicName).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
         OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) 
admin.topics().getOffloadPolicies(topicName);
         assertNull(offloadPolicies);
         OffloadPoliciesImpl offload = new OffloadPoliciesImpl();
@@ -215,8 +213,6 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         final String topicName = testTopic + UUID.randomUUID().toString();
         admin.topics().createPartitionedTopic(topicName, 3);
         pulsarClient.newProducer().topic(topicName).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
         OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) 
admin.topics().getOffloadPolicies(topicName, true);
         OffloadPoliciesImpl brokerPolicies = OffloadPoliciesImpl
                 .mergeConfiguration(null,null, 
pulsar.getConfiguration().getProperties());
@@ -277,8 +273,6 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
             admin.topics().createNonPartitionedTopic(topicName);
         }
         
pulsarClient.newProducer().topic(topicName).enableBatching(false).create().close();
-        Awaitility.await()
-                .until(()-> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
         //2 namespace level policy should use NullLedgerOffloader by default
         if (isPartitioned) {
             for (int i = 0; i < partitionNum; i++) {
@@ -306,9 +300,6 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
         
doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any());
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
-
         //4 set topic level offload policies
         admin.topics().setOffloadPolicies(topicName, offloadPolicies);
         Awaitility.await().untilAsserted(()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 830c8f1..864e75d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1956,7 +1956,6 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         final String namespace = "prop-xyz/ns1";
         pulsarClient.newProducer().topic(topic).create().close();
         TopicName topicName = TopicName.get(topic);
-        Awaitility.await().until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(topicName));
         PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
         PersistentTopic mockTopic = spy(persistentTopic);
         mockTopic.checkCompaction();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
index d231564..d6ebdcc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
@@ -333,7 +333,5 @@ public class MaxUnackedMessagesTest extends 
ProducerConsumerBase {
     private void waitCacheInit(String topicName) throws Exception {
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
         TopicName topic = TopicName.get(topicName);
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(topic));
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 8b8d944..035950e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -130,9 +130,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setBacklogQuota(testTopic, backlogQuota);
         log.info("Backlog quota set success on topic: {}", testTopic);
 
@@ -156,9 +153,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setBacklogQuota(testTopic, backlogQuota);
         log.info("Backlog quota set success on topic: {}", testTopic);
 
@@ -236,8 +230,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetBacklogQuotaApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertEquals(admin.topics().getBacklogQuotaMap(topic), 
Maps.newHashMap());
         assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), 
Maps.newHashMap());
         Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = 
ConfigHelper.backlogQuotaMap(conf);
@@ -302,9 +294,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                 .build();
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setBacklogQuota(testTopic, backlogQuota);
         Awaitility.await()
                 .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
@@ -344,9 +333,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         RetentionPolicies retention = new RetentionPolicies(60, 1024);
         log.info("Retention: {} will set to the topic: {}", retention, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setRetention(testTopic, retention);
         log.info("Retention set success on topic: {}", testTopic);
 
@@ -362,9 +348,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         RetentionPolicies retention = new RetentionPolicies(60, 1024);
         log.info("Retention: {} will set to the topic: {}", retention, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setRetention(testTopic, retention);
         log.info("Retention set success on topic: {}", testTopic);
 
@@ -382,8 +365,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testRetentionAppliedApi() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+
         RetentionPolicies brokerPolicies =
                 new RetentionPolicies(conf.getDefaultRetentionTimeInMinutes(), 
conf.getDefaultRetentionSizeInMB());
         assertEquals(admin.topics().getRetention(topic, true), brokerPolicies);
@@ -411,8 +393,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetSubDispatchRateApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getSubscriptionDispatchRate(topic));
         
assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace));
 
@@ -452,8 +432,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testRetentionPriority() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getRetention(topic));
         assertNull(admin.namespaces().getRetention(myNamespace));
 
@@ -510,8 +488,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetPersistenceApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getPersistence(topic));
         assertNull(admin.namespaces().getPersistence(myNamespace));
         PersistencePolicies brokerPolicy
@@ -590,9 +566,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
         PersistencePolicies persistencePolicies = new PersistencePolicies(3, 
3, 3, 0.1);
         log.info("PersistencePolicies: {} will set to the topic: {}", 
persistencePolicies, persistenceTopic);
-
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
         admin.topics().createNonPartitionedTopic(persistenceTopic);
         admin.topics().setPersistence(persistenceTopic, persistencePolicies);
 
@@ -623,8 +596,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetDispatchRateApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getDispatchRate(topic));
         assertNull(admin.namespaces().getDispatchRate(myNamespace));
         DispatchRate brokerDispatchRate = DispatchRate.builder()
@@ -669,9 +640,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
         PersistencePolicies persistencePolicies = new PersistencePolicies(3, 
3, 3, 0.1);
         log.info("PersistencePolicies: {} will set to the topic: {}", 
persistencePolicies, persistenceTopic);
-
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
         admin.topics().createNonPartitionedTopic(persistenceTopic);
         admin.topics().setPersistence(persistenceTopic, persistencePolicies);
 
@@ -711,8 +679,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetMaxProducerApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getMaxProducers(topic));
         assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace));
         assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), 
conf.getMaxProducersPerTopic());
@@ -737,9 +703,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         Integer maxProducers = 2;
         log.info("MaxProducers: {} will set to the topic: {}", maxProducers, 
persistenceTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxProducers(persistenceTopic, maxProducers);
 
@@ -768,9 +731,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testRemoveMaxProducers() throws Exception {
         Integer maxProducers = 2;
         log.info("MaxProducers: {} will set to the topic: {}", maxProducers, 
persistenceTopic);
-
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxProducers(persistenceTopic, maxProducers);
 
@@ -831,9 +791,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setDispatchRate(testTopic, dispatchRate);
         log.info("Dispatch Rate set success on topic: {}", testTopic);
 
@@ -853,9 +810,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setDispatchRate(testTopic, dispatchRate);
         log.info("Dispatch Rate set success on topic: {}", testTopic);
 
@@ -874,9 +828,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-
         DispatchRate dispatchRate = DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(200)
                 .dispatchThrottlingRateInByte(20000)
@@ -934,9 +885,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-
         // set namespace level inactive topic policies
         InactiveTopicPolicies inactiveTopicPolicies =
                 new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true);
@@ -995,9 +943,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-
         DispatchRate dispatchRate = DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(1000)
                 .dispatchThrottlingRateInByte(1024 * 1024)
@@ -1029,9 +974,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-
         DispatchRate dispatchRate = DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(1000)
                 .dispatchThrottlingRateInByte(1024 * 1024)
@@ -1063,9 +1005,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-
         DispatchRate dispatchRate = DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(1000)
                 .dispatchThrottlingRateInByte(1024 * 1024)
@@ -1108,9 +1047,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-
         // set namespace level subscription dispatch rate
         DispatchRate namespaceDispatchRate =                     
DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(100)
@@ -1169,9 +1105,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         Long compactionThreshold = 100000L;
         log.info("Compaction threshold: {} will set to the topic: {}", 
compactionThreshold, testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
         log.info("Compaction threshold set success on topic: {}", testTopic);
 
@@ -1186,9 +1119,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         Long compactionThreshold = 100000L;
         log.info("Compaction threshold: {} will set to the topic: {}", 
compactionThreshold, testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
         log.info("Compaction threshold set success on topic: {}", testTopic);
 
@@ -1208,9 +1138,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         Integer maxConsumersPerSubscription = 10;
         log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", 
maxConsumersPerSubscription, testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setMaxConsumersPerSubscription(testTopic, 
maxConsumersPerSubscription);
         log.info("MaxConsumersPerSubscription set success on topic: {}", 
testTopic);
 
@@ -1225,9 +1152,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         Integer maxConsumersPerSubscription = 10;
         log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", 
maxConsumersPerSubscription, testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setMaxConsumersPerSubscription(testTopic, 
maxConsumersPerSubscription);
         log.info("MaxConsumersPerSubscription set success on topic: {}", 
testTopic);
 
@@ -1247,9 +1171,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
         log.info("Publish Rate: {} will set to the topic: {}", publishRate, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setPublishRate(testTopic, publishRate);
         log.info("Publish Rate set success on topic: {}", testTopic);
 
@@ -1264,9 +1185,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
         log.info("Publish Rate: {} will set to the topic: {}", publishRate, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setPublishRate(testTopic, publishRate);
         log.info("Publish Rate set success on topic: {}", testTopic);
 
@@ -1298,8 +1216,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetMaxConsumersApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getMaxConsumers(topic));
         assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
         assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), 
conf.getMaxConsumersPerTopic());
@@ -1327,9 +1243,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         log.info("MaxConsumers: {} will set to the namespace: {}", 1, 
myNamespace);
         Integer maxConsumers = 2;
         log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, 
persistenceTopic);
-
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
 
@@ -1359,8 +1272,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testRemoveMaxConsumers() throws Exception {
         Integer maxConsumers = 2;
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
 
@@ -1411,9 +1322,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetSetSubscribeRate() throws Exception {
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         SubscribeRate subscribeRate1 = new SubscribeRate(1, 30);
         log.info("Subscribe Rate: {} will be set to the namespace: {}", 
subscribeRate1, myNamespace);
         admin.namespaces().setSubscribeRate(myNamespace, subscribeRate1);
@@ -1466,8 +1374,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetSubscribeRateApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getSubscribeRate(topic));
         assertNull(admin.namespaces().getSubscribeRate(myNamespace));
         SubscribeRate brokerPolicy = new SubscribeRate(
@@ -1502,8 +1408,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         String mySub = "my-sub";
         conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await().until(() ->
-                
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         List<Consumer<String>> consumerList = new ArrayList<>();
         ConsumerBuilder<String> builder = 
pulsarClient.newConsumer(Schema.STRING)
                 .subscriptionType(SubscriptionType.Shared)
@@ -1563,9 +1467,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testRemoveSubscribeRate() throws Exception {
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
 
-        Awaitility.await()
-             .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));
-
         SubscribeRate subscribeRate = new SubscribeRate(2, 30);
         log.info("Subscribe Rate: {} will set to the topic: {}", 
subscribeRate, persistenceTopic);
         admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
@@ -1646,9 +1547,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         conf.setMaxPublishRatePerTopicInBytes(50L);
         setup();
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         final String topicName = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         pulsarClient.newProducer().topic(topicName).create().close();
         Field publishMaxMessageRate = 
PublishRateLimiterImpl.class.getDeclaredField("publishMaxMessageRate");
@@ -1713,8 +1611,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testTopicMaxMessageSizeApi() throws Exception{
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));
         admin.topics().createNonPartitionedTopic(persistenceTopic);
         assertNull(admin.topics().getMaxMessageSize(persistenceTopic));
         admin.topics().setMaxMessageSize(persistenceTopic,10);
@@ -1752,8 +1648,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         }
         // init cache
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getMaxMessageSize(topic));
         // set msg size
         admin.topics().setMaxMessageSize(topic, 10);
@@ -1798,8 +1692,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         // init cache
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         int maxSubInNamespace = 2;
         List<Consumer> consumers = new ArrayList<>();
         ConsumerBuilder consumerBuilder = 
pulsarClient.newConsumer().subscriptionMode(subMode)
@@ -1830,8 +1722,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         // init cache
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
 
         assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic));
         // set max subscriptions
@@ -1856,8 +1746,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         // init cache
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         // Set topic-level max subscriptions
         final int topicLevelMaxSubNum = 2;
         admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
@@ -1903,8 +1791,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         // init cache
         @Cleanup
         Producer producer = pulsarClient.newProducer().topic(topic).create();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         //default value is null
         
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace));
         int msgNum = 100;
@@ -1992,8 +1878,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         // init cache
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         // Set topic-level max subscriptions
         final int topicLevelMaxSubNum = 2;
         admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
@@ -2073,8 +1957,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         // init cache
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
 
         assertNull(admin.topics().getReplicatorDispatchRate(topic));
 
@@ -2096,8 +1978,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetReplicatorRateApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getReplicatorDispatchRate(topic));
         assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace));
         DispatchRate brokerDispatchRate = DispatchRate.builder()
@@ -2140,8 +2020,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         //should not fail
         assertNull(admin.topics().getMessageTTL(topic));
     }
@@ -2150,9 +2028,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testSubscriptionTypesEnabled() throws Exception {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
-
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         // use broker.conf
         
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
         assertNull(admin.topics().getSubscriptionTypesEnabled(topic));
@@ -2207,9 +2082,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final String topic = "non-persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
         Producer producer = pulsarClient.newProducer().topic(topic).create();
-
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         final String subName = "my-sub";
         ConsumerBuilder builder = pulsarClient.newConsumer()
                 .subscriptionType(SubscriptionType.Shared)
@@ -2259,8 +2131,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     public void testGetCompactionThresholdApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         assertNull(admin.topics().getCompactionThreshold(topic));
         assertNull(admin.namespaces().getCompactionThreshold(myNamespace));
         long brokerPolicy = 
pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes();
@@ -2316,9 +2186,6 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
 
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
         admin.topics().setBacklogQuota(testTopic, backlogQuota);
         log.info("Backlog quota set success on topic: {}", testTopic);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index a73cd50..2b33844 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -386,8 +386,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
         admin.topics().createPartitionedTopic(topicName, 3);
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
         TopicName topic = TopicName.get(topicName);
-        Awaitility.await().until(()
-                -> pulsar.getTopicPoliciesService().cacheIsInitialized(topic));
 
         InactiveTopicPolicies inactiveTopicPolicies = 
admin.topics().getInactiveTopicPolicies(topicName);
         assertNull(inactiveTopicPolicies);
@@ -431,9 +429,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
             //wait for cache
             
pulsarClient.newConsumer().topic(tp).subscriptionName("my-sub").subscribe().close();
             TopicName topicName = TopicName.get(tp);
-            while 
(!pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)) {
-                Thread.sleep(500);
-            }
         }
 
         InactiveTopicPolicies inactiveTopicPolicies =
@@ -513,9 +508,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
             producer.close();
             Thread.sleep(1);
         }
-        //wait for cache init
-        Awaitility.await().until(()
-                -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic3)));
         // "topic" use delete_when_no_subscriptions, "topic2" use 
delete_when_subscriptions_caught_up
         // "topic3" use default:delete_when_no_subscriptions
         InactiveTopicPolicies inactiveTopicPolicies =
@@ -555,8 +547,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
         final String namespace = "prop/ns-abc";
         final String topic = "persistent://prop/ns-abc/test-" + 
UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await()
-                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         //namespace-level default value is null
         assertNull(admin.namespaces().getInactiveTopicPolicies(namespace));
         //topic-level default value is null
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 93ad914..879db02 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -97,8 +97,6 @@ public class ReplicatorRateLimiterTest extends 
ReplicatorTestBase {
                 .statsInterval(0, TimeUnit.SECONDS).build();
         client1.newProducer().topic(topicName).create().close();
         PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
-        Awaitility.await()
-                .until(() -> 
pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
 
         //use broker-level by default
         
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1e1dca3..071160b0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1236,12 +1236,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         String systemTopic = TopicName.get("persistent", 
NamespaceName.get(namespace),
                 EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME).toString();
         admin1.topics().createNonPartitionedTopic(topic);
-        Awaitility.await()
-                .until(() -> 
pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-        Awaitility.await()
-                .until(() -> 
pulsar2.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
-        Awaitility.await()
-                .until(() -> 
pulsar3.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
         admin1.topics().setRetention(topic, new RetentionPolicies(10, 10));
         admin2.topics().setRetention(topic, new RetentionPolicies(20, 20));
         admin3.topics().setRetention(topic, new RetentionPolicies(30, 30));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index d0a04b6..63680db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -191,8 +191,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         TopicName topicName = TopicName.get(topic);
         admin.topics().createPartitionedTopic(topic, 3);
         pulsarClient.newProducer().topic(topic).create().close();
-        Awaitility.await().untilAsserted(()
-                -> 
systemTopicBasedTopicPoliciesService.cacheIsInitialized(topicName));
         admin.topics().setMaxConsumers(topic, 1000);
         Awaitility.await().untilAsserted(() ->
                 assertNotNull(admin.topics().getMaxConsumers(topic)));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 0e095e2..cb870f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -336,8 +336,6 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
 
         admin.topics().createPartitionedTopic(topicName, 3);
         pulsarClient.newProducer().topic(topicName).create().close();
-        Awaitility.await().untilAsserted(() -> pulsar.getTopicPoliciesService()
-                .cacheIsInitialized(TopicName.get(topicName)));
         assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
         DelayedDeliveryPolicies delayedDeliveryPolicies = 
DelayedDeliveryPolicies.builder()
                 .tickTime(2000)
@@ -372,8 +370,6 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
 
         admin.topics().createPartitionedTopic(topicName, 3);
         pulsarClient.newProducer().topic(topicName).create().close();
-        Awaitility.await().untilAsserted(() -> pulsar.getTopicPoliciesService()
-                .cacheIsInitialized(TopicName.get(topicName)));
         assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
         //1 Set topic policy
         DelayedDeliveryPolicies delayedDeliveryPolicies = 
DelayedDeliveryPolicies.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index 48ccdba..c8f09fa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -508,8 +508,6 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
     private void waitCacheInit(String topicName) throws Exception {
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
         TopicName topic = TopicName.get(topicName);
-        Awaitility.await()
-                .until(()-> 
pulsar.getTopicPoliciesService().cacheIsInitialized(topic));
     }
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
index 762f8f2..8106968 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -32,8 +32,7 @@ public class BackoffBuilder {
     private long mandatoryStop;
     private TimeUnit unitMandatoryStop;
     
-    @VisibleForTesting
-    BackoffBuilder() {
+    public BackoffBuilder() {
         this.initial = 0;
         this.max = 0;
         this.mandatoryStop = 0;

Reply via email to