This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 32aa263693c6b95f9d41637efb6aba5097c73e70 Author: lipenghui <[email protected]> AuthorDate: Fri Jul 30 09:57:44 2021 +0800 Add backoff for setting for getting topic policies. (#11487) Currently, if we start a new broker which does not owned any namepsaces bundles. Then when use the pulsar-admin to setting or getting topic policies, we will get `topic policies have not been initialized yet` error log and the admin operation will get failed. The root cause is we are failed immediately without any retry while the topic polices cache have not init yet. So the PR to introduce the backoff for setting or getting the topic policy if encounter the topic policies cache not init exception Remove the cache init check for the tests. (cherry picked from commit bebaadf2087019dc187fd7a91f491dee1fca034d) --- .../apache/pulsar/broker/admin/AdminResource.java | 47 +- .../broker/admin/impl/PersistentTopicsBase.java | 681 +++++++++++---------- .../pulsar/broker/admin/v2/PersistentTopics.java | 70 +-- .../pulsar/broker/service/BacklogQuotaManager.java | 10 +- .../SystemTopicBasedTopicPoliciesService.java | 6 - .../broker/service/TopicPoliciesService.java | 12 - .../broker/admin/AdminApiDelayedDelivery.java | 2 - .../broker/admin/AdminApiMaxUnackedMessages.java | 2 - .../pulsar/broker/admin/AdminApiOffloadTest.java | 9 - .../apache/pulsar/broker/admin/AdminApiTest2.java | 1 - .../broker/admin/MaxUnackedMessagesTest.java | 2 - .../pulsar/broker/admin/TopicPoliciesTest.java | 135 +--- .../broker/service/InactiveTopicDeleteTest.java | 10 - .../broker/service/ReplicatorRateLimiterTest.java | 2 - .../pulsar/broker/service/ReplicatorTest.java | 6 - .../SystemTopicBasedTopicPoliciesServiceTest.java | 2 - .../service/persistent/DelayedDeliveryTest.java | 4 - .../service/persistent/TopicDuplicationTest.java | 2 - .../apache/pulsar/client/impl/BackoffBuilder.java | 3 +- 19 files changed, 438 insertions(+), 568 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 530b350..8f1e780 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -29,6 +29,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.servlet.ServletContext; import javax.ws.rs.WebApplicationException; @@ -43,6 +46,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.internal.TopicsImpl; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -80,6 +85,7 @@ public abstract class AdminResource extends PulsarWebResource { public static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly"; public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers"; + private static final long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000; protected BookKeeper bookKeeper() { return pulsar().getBookKeeperClient(); @@ -363,19 +369,46 @@ public abstract class AdminResource extends PulsarWebResource { return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath); } - protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) { + protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) { + return internalGetTopicPoliciesAsyncWithRetry(topicName, + new AtomicLong(DEFAULT_GET_TOPIC_POLICY_TIMEOUT), null, null); + } + + protected CompletableFuture<Optional<TopicPolicies>> internalGetTopicPoliciesAsyncWithRetry(TopicName topicName, + final AtomicLong remainingTime, final Backoff backoff, CompletableFuture<Optional<TopicPolicies>> future) { + CompletableFuture<Optional<TopicPolicies>> response = future == null ? new CompletableFuture<>() : future; try { checkTopicLevelPolicyEnable(); - return Optional.ofNullable(pulsar().getTopicPoliciesService().getTopicPolicies(topicName)); + response.complete(Optional.ofNullable(pulsar() + .getTopicPoliciesService().getTopicPolicies(topicName))); } catch (RestException re) { - throw re; - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){ - log.error("Topic {} policies have not been initialized yet.", topicName); - throw new RestException(e); + response.completeExceptionally(re); + } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { + Backoff usedBackoff = backoff == null ? new BackoffBuilder() + .setInitialTime(500, TimeUnit.MILLISECONDS) + .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) + .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) + .create() : backoff; + long nextDelay = Math.min(usedBackoff.next(), remainingTime.get()); + if (nextDelay <= 0) { + response.completeExceptionally(new TimeoutException( + String.format("Failed to get topic policy withing configured timeout %s ms", + DEFAULT_GET_TOPIC_POLICY_TIMEOUT))); + } else { + if (log.isDebugEnabled()) { + log.error("Topic {} policies have not been initialized yet, retry after {}ms", + topicName, nextDelay); + } + pulsar().getExecutor().schedule(() -> { + remainingTime.addAndGet(-nextDelay); + internalGetTopicPoliciesAsyncWithRetry(topicName, remainingTime, usedBackoff, response); + }, nextDelay, TimeUnit.MILLISECONDS); + } } catch (Exception e) { log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e); - throw new RestException(e); + response.completeExceptionally(e); } + return response; } protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c1f6800..3d78116 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -504,16 +504,14 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) { - TopicPolicies topicPolicies; - try { - topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive()); - topicPolicies.setDelayedDeliveryTickTimeMillis( - deliveryPolicies == null ? null : deliveryPolicies.getTickTime()); - } catch (Exception e) { - return FutureUtil.failedFuture(e); - } - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive()); + topicPolicies.setDelayedDeliveryTickTimeMillis( + deliveryPolicies == null ? null : deliveryPolicies.getTickTime()); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) { @@ -744,42 +742,41 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied) { - TopicPolicies policies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - DelayedDeliveryPolicies delayedDeliveryPolicies = null; - if (policies.isDelayedDeliveryEnabledSet() && policies.isDelayedDeliveryTickTimeMillisSet()) { - delayedDeliveryPolicies = DelayedDeliveryPolicies.builder() - .tickTime(policies.getDelayedDeliveryTickTimeMillis()) - .active(policies.getDelayedDeliveryEnabled()) - .build(); - } - if (delayedDeliveryPolicies == null && applied) { - delayedDeliveryPolicies = getNamespacePolicies(namespaceName).delayed_delivery_policies; - if (delayedDeliveryPolicies == null) { - delayedDeliveryPolicies = DelayedDeliveryPolicies.builder() - .tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis()) - .active(pulsar().getConfiguration().isDelayedDeliveryEnabled()) - .build(); - } - } - return CompletableFuture.completedFuture(delayedDeliveryPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> { + TopicPolicies policies = op.orElseGet(TopicPolicies::new); + DelayedDeliveryPolicies delayedDeliveryPolicies = null; + if (policies.isDelayedDeliveryEnabledSet() && policies.isDelayedDeliveryTickTimeMillisSet()) { + delayedDeliveryPolicies = DelayedDeliveryPolicies.builder() + .tickTime(policies.getDelayedDeliveryTickTimeMillis()) + .active(policies.getDelayedDeliveryEnabled()) + .build(); + } + if (delayedDeliveryPolicies == null && applied) { + delayedDeliveryPolicies = getNamespacePolicies(namespaceName).delayed_delivery_policies; + if (delayedDeliveryPolicies == null) { + delayedDeliveryPolicies = DelayedDeliveryPolicies.builder() + .tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis()) + .active(pulsar().getConfiguration().isDelayedDeliveryEnabled()) + .build(); + } + } + return delayedDeliveryPolicies; + }); } protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(boolean applied) { - CompletableFuture<OffloadPoliciesImpl> res = new CompletableFuture<>(); - try { - OffloadPoliciesImpl offloadPolicies = - getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null); - if (applied) { - OffloadPoliciesImpl namespacePolicy = - (OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies; - offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies - , namespacePolicy, pulsar().getConfiguration().getProperties()); - } - res.complete(offloadPolicies); - } catch (Exception e) { - res.completeExceptionally(e); - } - return res; + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> { + OffloadPoliciesImpl offloadPolicies = op.map(TopicPolicies::getOffloadPolicies).orElse(null); + if (applied) { + OffloadPoliciesImpl namespacePolicy = + (OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies; + offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies + , namespacePolicy, pulsar().getConfiguration().getProperties()); + } + return offloadPolicies; + }); } protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) { @@ -821,8 +818,8 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) { - InactiveTopicPolicies inactiveTopicPolicies = getTopicPolicies(topicName) - .map(TopicPolicies::getInactiveTopicPolicies) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies) .orElseGet(() -> { if (applied) { InactiveTopicPolicies policies = getNamespacePolicies(namespaceName).inactive_topic_policies; @@ -832,8 +829,7 @@ public class PersistentTopicsBase extends AdminResource { config().isBrokerDeleteInactiveTopicsEnabled()) : policies; } return null; - }); - return CompletableFuture.completedFuture(inactiveTopicPolicies); + })); } protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) { @@ -882,8 +878,8 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied) { - Integer maxNum = getTopicPolicies(topicName) - .map(TopicPolicies::getMaxUnackedMessagesOnSubscription) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnSubscription) .orElseGet(() -> { if (applied) { Integer maxUnackedNum = getNamespacePolicies(namespaceName) @@ -891,8 +887,7 @@ public class PersistentTopicsBase extends AdminResource { return maxUnackedNum == null ? config().getMaxUnackedMessagesPerSubscription() : maxUnackedNum; } return null; - }); - return CompletableFuture.completedFuture(maxNum); + })); } protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) { @@ -916,16 +911,15 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied) { - Integer maxNum = getTopicPolicies(topicName) - .map(TopicPolicies::getMaxUnackedMessagesOnConsumer) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnConsumer) .orElseGet(() -> { if (applied) { Integer maxUnacked = getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer; return maxUnacked == null ? config().getMaxUnackedMessagesPerConsumer() : maxUnacked; } return null; - }); - return CompletableFuture.completedFuture(maxNum); + })); } protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) { @@ -953,10 +947,12 @@ public class PersistentTopicsBase extends AdminResource { if (interval != null && interval < 0) { throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more"); } - TopicPolicies policies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - policies.setDeduplicationSnapshotIntervalSeconds(interval); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies); - + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies policies = op.orElseGet(TopicPolicies::new); + policies.setDeduplicationSnapshotIntervalSeconds(interval); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies); + }); } private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { @@ -2650,80 +2646,81 @@ public class PersistentTopicsBase extends AdminResource { return offlineTopicStats; } - protected Map<BacklogQuota.BacklogQuotaType, BacklogQuota> internalGetBacklogQuota(boolean applied) { - Map<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaMap = getTopicPolicies(topicName) - .map(TopicPolicies::getBackLogQuotaMap) - .map(map -> { - HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap(); - map.forEach((key, value) -> hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key), value)); - return hashMap; - }).orElse(Maps.newHashMap()); - if (applied && quotaMap.isEmpty()) { - quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map; - if (quotaMap.isEmpty()) { - String namespace = namespaceName.toString(); - quotaMap.put( - BacklogQuota.BacklogQuotaType.destination_storage, - namespaceBacklogQuota(namespace, AdminResource.path(POLICIES, namespace)) - ); - - } - } - return quotaMap; + protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> internalGetBacklogQuota( + boolean applied) { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> { + Map<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaMap = op + .map(TopicPolicies::getBackLogQuotaMap) + .map(map -> { + HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap(); + map.forEach((key, value) -> hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key), value)); + return hashMap; + }).orElse(Maps.newHashMap()); + if (applied && quotaMap.isEmpty()) { + quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map; + if (quotaMap.isEmpty()) { + String namespace = namespaceName.toString(); + quotaMap.put( + BacklogQuota.BacklogQuotaType.destination_storage, + namespaceBacklogQuota(namespace, AdminResource.path(POLICIES, namespace)) + ); + } + } + return quotaMap; + }); } protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuota) { validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); - TopicPolicies topicPolicies; - if (backlogQuotaType == null) { - backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage; - } - try { - topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - } catch (Exception e) { - return FutureUtil.failedFuture(e); - } - RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies); - if (!checkBacklogQuota(backlogQuota, retentionPolicies)) { - log.warn( - "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota", - clientAppId(), topicName); - return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, - "Backlog Quota exceeds configured retention quota for topic. " - + "Please increase retention quota and retry")); - } + BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null + ? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType; + + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies); + if (!checkBacklogQuota(backlogQuota, retentionPolicies)) { + log.warn( + "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Backlog Quota exceeds configured retention quota for topic. " + + "Please increase retention quota and retry")); + } - if (backlogQuota != null) { - topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), backlogQuota); - } else { - topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name()); - } - Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap(); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> { - try { - log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}", - clientAppId(), - namespaceName, - topicName.getLocalName(), - jsonMapper().writeValueAsString(backLogQuotaMap)); - } catch (JsonProcessingException ignore) { } - }); + if (backlogQuota != null) { + topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota); + } else { + topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name()); + } + Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap(); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) + .thenRun(() -> { + try { + log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + jsonMapper().writeValueAsString(backLogQuotaMap)); + } catch (JsonProcessingException ignore) { } + }); + }); } protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied) { - Boolean deduplicationEnabled = getTopicPolicies(topicName) - .map(TopicPolicies::getDeduplicationEnabled) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled) .orElseGet(() -> { if (applied) { Boolean enabled = getNamespacePolicies(namespaceName).deduplicationEnabled; return enabled == null ? config().isBrokerDeduplicationEnabled() : enabled; } return null; - }); - return CompletableFuture.completedFuture(deduplicationEnabled); + })); } protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) { @@ -2747,20 +2744,16 @@ public class PersistentTopicsBase extends AdminResource { return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL")); } - TopicPolicies topicPolicies; - try { - topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - } catch (Exception e) { - return FutureUtil.failedFuture(e); - } - topicPolicies.setMessageTTLInSeconds(ttlInSecond); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> { - log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}", - clientAppId(), - namespaceName, - topicName.getLocalName(), - ttlInSecond); - }); + + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setMessageTTLInSeconds(ttlInSecond); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) + .thenRun(() -> + log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}", + clientAppId(), namespaceName, topicName.getLocalName(), ttlInSecond)); + }); } private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) { @@ -2777,92 +2770,96 @@ public class PersistentTopicsBase extends AdminResource { return retentionPolicies; } - protected RetentionPolicies internalGetRetention(boolean applied) { - return getTopicPolicies(topicName) - .map(TopicPolicies::getRetentionPolicies).orElseGet(() -> { - if (applied) { - RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies; - return policies == null ? new RetentionPolicies( - config().getDefaultRetentionTimeInMinutes(), config().getDefaultRetentionSizeInMB()) - : policies; - } - return null; - }); + protected CompletableFuture<RetentionPolicies> internalGetRetention(boolean applied) { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> { + if (applied) { + RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies; + return policies == null ? new RetentionPolicies( + config().getDefaultRetentionTimeInMinutes(), config().getDefaultRetentionSizeInMB()) + : policies; + } + return null; + })); } protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) { if (retention == null) { return CompletableFuture.completedFuture(null); } - TopicPolicies topicPolicies; - try { - topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - } catch (Exception e) { - return FutureUtil.failedFuture(e); - } - BacklogQuota backlogQuota = - topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name()); - if (backlogQuota == null) { - Policies policies = getNamespacePolicies(topicName.getNamespaceObject()); - backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); - } - if (!checkBacklogQuota(backlogQuota, retention)) { - log.warn( - "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota", - clientAppId(), topicName); - return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, - "Retention Quota must exceed configured backlog quota for topic. " - + "Please increase retention quota and retry")); - } - topicPolicies.setRetentionPolicies(retention); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + BacklogQuota backlogQuota = + topicPolicies.getBackLogQuotaMap() + .get(BacklogQuota.BacklogQuotaType.destination_storage.name()); + if (backlogQuota == null) { + Policies policies = getNamespacePolicies(topicName.getNamespaceObject()); + backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); + } + if (!checkBacklogQuota(backlogQuota, retention)) { + log.warn( + "[{}] Failed to update retention quota configuration for topic {}: " + + "conflicts with retention quota", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Retention Quota must exceed configured backlog quota for topic. " + + "Please increase retention quota and retry")); + } + topicPolicies.setRetentionPolicies(retention); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemoveRetention() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setRetentionPolicies(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setRetentionPolicies(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } protected CompletableFuture<PersistencePolicies> internalGetPersistence(boolean applied) { - PersistencePolicies persistencePolicies = getTopicPolicies(topicName) - .map(TopicPolicies::getPersistence) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getPersistence) .orElseGet(() -> { if (applied) { PersistencePolicies namespacePolicy = getNamespacePolicies(namespaceName) .persistence; return namespacePolicy == null ? new PersistencePolicies( - pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(), - pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(), - pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(), - pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit()) + pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(), + pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(), + pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(), + pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit()) : namespacePolicy; } return null; - }); - return CompletableFuture.completedFuture(persistencePolicies); + })); } protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) { validatePersistencePolicies(persistencePolicies); - - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - topicPolicies.setPersistence(persistencePolicies); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setPersistence(persistencePolicies); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemovePersistence() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setPersistence(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setPersistence(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize) { @@ -2872,26 +2869,29 @@ public class PersistentTopicsBase extends AdminResource { + "and must be smaller than that in the broker-level"); } - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - topicPolicies.setMaxMessageSize(maxMessageSize); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setMaxMessageSize(maxMessageSize); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } - protected Optional<Integer> internalGetMaxMessageSize() { - return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize); + protected CompletableFuture<Optional<Integer>> internalGetMaxMessageSize() { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getMaxMessageSize)); } protected CompletableFuture<Integer> internalGetMaxProducers(boolean applied) { - Integer maxNum = getTopicPolicies(topicName) - .map(TopicPolicies::getMaxProducerPerTopic) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getMaxProducerPerTopic) .orElseGet(() -> { if (applied) { Integer maxProducer = getNamespacePolicies(namespaceName).max_producers_per_topic; return maxProducer == null ? config().getMaxProducersPerTopic() : maxProducer; } return null; - }); - return CompletableFuture.completedFuture(maxNum); + })); } protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers) { @@ -2899,13 +2899,18 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(Status.PRECONDITION_FAILED, "maxProducers must be 0 or more"); } - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - topicPolicies.setMaxProducerPerTopic(maxProducers); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setMaxProducerPerTopic(maxProducers); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); + } - protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() { - return getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic); + protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic() { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic)); } protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) { @@ -2914,14 +2919,17 @@ public class PersistentTopicsBase extends AdminResource { "maxSubscriptionsPerTopic must be 0 or more"); } - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied) { - DispatchRateImpl dispatchRate = getTopicPolicies(topicName) - .map(TopicPolicies::getReplicatorDispatchRate) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate) .orElseGet(() -> { if (applied) { DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName) @@ -2929,14 +2937,16 @@ public class PersistentTopicsBase extends AdminResource { return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy; } return null; - }); - return CompletableFuture.completedFuture(dispatchRate); + })); } protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) { - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - topicPolicies.setReplicatorDispatchRate(dispatchRate); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setReplicatorDispatchRate(dispatchRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> preValidation(boolean authoritative) { @@ -2966,25 +2976,26 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveMaxProducers() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setMaxProducerPerTopic(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setMaxProducerPerTopic(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } protected CompletableFuture<Integer> internalGetMaxConsumers(boolean applied) { - Integer maxNum = getTopicPolicies(topicName) - .map(TopicPolicies::getMaxConsumerPerTopic) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getMaxConsumerPerTopic) .orElseGet(() -> { if (applied) { Integer maxConsumer = getNamespacePolicies(namespaceName).max_consumers_per_topic; return maxConsumer == null ? config().getMaxConsumersPerTopic() : maxConsumer; } return null; - }); - return CompletableFuture.completedFuture(maxNum); + })); } protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers) { @@ -2992,19 +3003,24 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(Status.PRECONDITION_FAILED, "maxConsumers must be 0 or more"); } - - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); - topicPolicies.setMaxConsumerPerTopic(maxConsumers); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setMaxConsumerPerTopic(maxConsumers); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemoveMaxConsumers() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setMaxConsumerPerTopic(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setMaxConsumerPerTopic(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); + } protected MessageId internalTerminate(boolean authoritative) { @@ -3862,8 +3878,8 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<DispatchRateImpl> internalGetDispatchRate(boolean applied) { - DispatchRateImpl dispatchRate = getTopicPolicies(topicName) - .map(TopicPolicies::getDispatchRate) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getDispatchRate) .orElseGet(() -> { if (applied) { DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName) @@ -3871,33 +3887,35 @@ public class PersistentTopicsBase extends AdminResource { return namespacePolicy == null ? dispatchRate() : namespacePolicy; } return null; - }); - return CompletableFuture.completedFuture(dispatchRate); + })); } protected CompletableFuture<Void> internalSetDispatchRate(DispatchRateImpl dispatchRate) { if (dispatchRate == null) { return CompletableFuture.completedFuture(null); } - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - topicPolicies.setDispatchRate(dispatchRate); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setDispatchRate(dispatchRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemoveDispatchRate() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setDispatchRate(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); - + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setDispatchRate(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied) { - DispatchRate dispatchRate = getTopicPolicies(topicName) - .map(TopicPolicies::getSubscriptionDispatchRate) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate) .orElseGet(() -> { if (applied) { DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName) @@ -3905,57 +3923,64 @@ public class PersistentTopicsBase extends AdminResource { return namespacePolicy == null ? subscriptionDispatchRate() : namespacePolicy; } return null; - }); - return CompletableFuture.completedFuture(dispatchRate); + })); } protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) { if (dispatchRate == null) { return CompletableFuture.completedFuture(null); } - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - topicPolicies.setSubscriptionDispatchRate(dispatchRate); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setSubscriptionDispatchRate(dispatchRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setSubscriptionDispatchRate(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setSubscriptionDispatchRate(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } - protected Optional<Integer> internalGetMaxConsumersPerSubscription() { - return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription); + protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription() { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getMaxConsumersPerSubscription)); } protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) { if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription"); } - - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setMaxConsumersPerSubscription(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setMaxConsumersPerSubscription(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied) { - Long threshold = getTopicPolicies(topicName) - .map(TopicPolicies::getCompactionThreshold) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getCompactionThreshold) .orElseGet(() -> { if (applied) { Long namespacePolicy = getNamespacePolicies(namespaceName).compaction_threshold; @@ -3964,8 +3989,7 @@ public class PersistentTopicsBase extends AdminResource { : namespacePolicy; } return null; - }); - return CompletableFuture.completedFuture(threshold); + })); } protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) { @@ -3973,62 +3997,74 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold"); } - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - topicPolicies.setCompactionThreshold(compactionThreshold); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setCompactionThreshold(compactionThreshold); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); + } protected CompletableFuture<Void> internalRemoveCompactionThreshold() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setCompactionThreshold(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setCompactionThreshold(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } - protected Optional<PublishRate> internalGetPublishRate() { - return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate); - + protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate() { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getPublishRate)); } protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate) { if (publishRate == null) { return CompletableFuture.completedFuture(null); } - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - topicPolicies.setPublishRate(publishRate); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setPublishRate(publishRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } - protected Optional<List<SubType>> internalGetSubscriptionTypesEnabled() { - return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionTypesEnabled); + protected CompletableFuture<Optional<List<SubType>>> internalGetSubscriptionTypesEnabled() { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getSubscriptionTypesEnabled)); } protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled( Set<SubscriptionType> subscriptionTypesEnabled) { List<SubType> subTypes = Lists.newArrayList(); subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name()))); - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - topicPolicies.setSubscriptionTypesEnabled(subTypes); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setSubscriptionTypesEnabled(subTypes); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemovePublishRate() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setPublishRate(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setPublishRate(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied) { - SubscribeRate subscribeRate = getTopicPolicies(topicName) - .map(TopicPolicies::getSubscribeRate) + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getSubscribeRate) .orElseGet(() -> { if (applied) { SubscribeRate namespacePolicy = getNamespacePolicies(namespaceName) @@ -4036,27 +4072,30 @@ public class PersistentTopicsBase extends AdminResource { return namespacePolicy == null ? subscribeRate() : namespacePolicy; } return null; - }); - return CompletableFuture.completedFuture(subscribeRate); + })); } protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) { if (subscribeRate == null) { return CompletableFuture.completedFuture(null); } - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); - topicPolicies.setSubscribeRate(subscribeRate); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setSubscribeRate(subscribeRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); } protected CompletableFuture<Void> internalRemoveSubscribeRate() { - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setSubscribeRate(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setSubscribeRate(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + }); } protected void internalHandleResult(AsyncResponse asyncResponse, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 20e3f75..a0699a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -27,7 +27,6 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -52,7 +51,6 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.ResetCursorData; -import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -417,8 +415,9 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenRun(() -> { - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies()); + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) + .thenAccept(op -> { + TopicPolicies topicPolicies = op.orElse(new TopicPolicies()); asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds()); }) .exceptionally(ex -> { @@ -1551,7 +1550,8 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenAccept(__ -> asyncResponse.resume(internalGetBacklogQuota(applied))) + .thenCompose(__ -> internalGetBacklogQuota(applied)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { handleTopicPolicyException("getBacklogQuotaMap", ex, asyncResponse); return null; @@ -1625,18 +1625,17 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenAccept(__ -> - asyncResponse.resume(getTopicPolicies(topicName) - .map(TopicPolicies::getMessageTTLInSeconds) - .orElseGet(() -> { - if (applied) { - Integer otherLevelTTL = getNamespacePolicies(namespaceName).message_ttl_in_seconds; - return otherLevelTTL == null ? pulsar().getConfiguration().getTtlDurationDefaultInSeconds() - : otherLevelTTL; - } - return null; - })) - ) + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) + .thenAccept(op -> asyncResponse.resume(op + .map(TopicPolicies::getMessageTTLInSeconds) + .orElseGet(() -> { + if (applied) { + Integer otherLevelTTL = getNamespacePolicies(namespaceName).message_ttl_in_seconds; + return otherLevelTTL == null ? pulsar().getConfiguration().getTtlDurationDefaultInSeconds() + : otherLevelTTL; + } + return null; + }))) .exceptionally(ex -> { handleTopicPolicyException("getMessageTTL", ex, asyncResponse); return null; @@ -1788,7 +1787,8 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenRun(() -> asyncResponse.resume(internalGetRetention(applied))) + .thenCompose(__ -> internalGetRetention(applied)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { handleTopicPolicyException("getRetention", ex, asyncResponse); return null; @@ -1971,11 +1971,9 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenRun(() -> { - Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic(); - asyncResponse.resume(maxSubscriptionsPerTopic.isPresent() ? maxSubscriptionsPerTopic.get() - : Response.noContent().build()); - }) + .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic()) + .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() + : Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("getMaxSubscriptions", ex, asyncResponse); return null; @@ -2317,8 +2315,8 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenRun(() -> { - Optional<Integer> policies = internalGetMaxMessageSize(); + .thenCompose(__ -> internalGetMaxMessageSize()) + .thenAccept(policies -> { asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build()); }) .exceptionally(ex -> { @@ -2887,11 +2885,9 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenRun(() -> { - Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription(); - asyncResponse.resume(maxConsumersPerSubscription.isPresent() ? maxConsumersPerSubscription.get() - : Response.noContent().build()); - }) + .thenCompose(__ -> internalGetMaxConsumersPerSubscription()) + .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() + : Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("getMaxConsumersPerSubscription", ex, asyncResponse); return null; @@ -2983,11 +2979,9 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenRun(() -> { - Optional<PublishRate> publishRate = internalGetPublishRate(); - asyncResponse.resume(publishRate.isPresent() ? publishRate.get() - : Response.noContent().build()); - }) + .thenCompose(__ -> internalGetPublishRate()) + .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() + : Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("getPublishRate", ex, asyncResponse); return null; @@ -3077,9 +3071,9 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenRun(() -> { - Optional<List<SubType>> subscriptionTypesEnabled = internalGetSubscriptionTypesEnabled(); - asyncResponse.resume(subscriptionTypesEnabled.isPresent() ? subscriptionTypesEnabled.get() + .thenCompose(__ -> internalGetSubscriptionTypesEnabled()) + .thenAccept(op -> { + asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build()); }) .exceptionally(ex -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 21792b3..c941b6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -85,12 +85,10 @@ public class BacklogQuotaManager { } try { - if (pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)) { - return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName)) - .map(TopicPolicies::getBackLogQuotaMap) - .map(map -> map.get(BacklogQuotaType.destination_storage.name())) - .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath)); - } + return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName)) + .map(TopicPolicies::getBackLogQuotaMap) + .map(map -> map.get(BacklogQuotaType.destination_storage.name())) + .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath)); } catch (Exception e) { log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}", topicName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index ab4a521..53bc099 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -148,12 +148,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } @Override - public boolean cacheIsInitialized(TopicName topicName) { - return policyCacheInitMap.containsKey(topicName.getNamespaceObject()) - && policyCacheInitMap.get(topicName.getNamespaceObject()); - } - - @Override public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { if (policyCacheInitMap.containsKey(topicName.getNamespaceObject()) && !policyCacheInitMap.get(topicName.getNamespaceObject())) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 1086b45..2f42b64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -80,13 +80,6 @@ public interface TopicPoliciesService { */ void start(); - /** - * whether the cache has been initialized. - * @param topicName - * @return - */ - boolean cacheIsInitialized(TopicName topicName); - void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener); void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener); @@ -139,11 +132,6 @@ public interface TopicPoliciesService { } @Override - public boolean cacheIsInitialized(TopicName topicName) { - return false; - } - - @Override public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) { //No-op } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java index 3f010fa..71da565 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java @@ -150,8 +150,6 @@ public class AdminApiDelayedDelivery extends MockedPulsarServiceBaseTest { final String topic = "persistent://" + namespace + "/test" + UUID.randomUUID(); admin.namespaces().createNamespace(namespace); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); //namespace-level default value is null assertNull(admin.namespaces().getDelayedDelivery(namespace)); //topic-level default value is null diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java index 91a8f44..7d6010e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java @@ -145,8 +145,6 @@ public class AdminApiMaxUnackedMessages extends MockedPulsarServiceBaseTest { for (int i = 0; i < 50; i++) { producer.send("msg".getBytes()); } - Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace)); assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)); admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, namespaceLevelPolicy); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 2894943..9697459 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -192,8 +192,6 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest { final String topicName = testTopic + UUID.randomUUID().toString(); admin.topics().createPartitionedTopic(topicName, 3); pulsarClient.newProducer().topic(topicName).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) admin.topics().getOffloadPolicies(topicName); assertNull(offloadPolicies); OffloadPoliciesImpl offload = new OffloadPoliciesImpl(); @@ -215,8 +213,6 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest { final String topicName = testTopic + UUID.randomUUID().toString(); admin.topics().createPartitionedTopic(topicName, 3); pulsarClient.newProducer().topic(topicName).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) admin.topics().getOffloadPolicies(topicName, true); OffloadPoliciesImpl brokerPolicies = OffloadPoliciesImpl .mergeConfiguration(null,null, pulsar.getConfiguration().getProperties()); @@ -277,8 +273,6 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest { admin.topics().createNonPartitionedTopic(topicName); } pulsarClient.newProducer().topic(topicName).enableBatching(false).create().close(); - Awaitility.await() - .until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); //2 namespace level policy should use NullLedgerOffloader by default if (isPartitioned) { for (int i = 0; i < partitionNum; i++) { @@ -306,9 +300,6 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest { when(topicOffloader.getOffloadDriverName()).thenReturn("mock"); doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any()); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); - //4 set topic level offload policies admin.topics().setOffloadPolicies(topicName, offloadPolicies); Awaitility.await().untilAsserted(() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 830c8f1..864e75d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -1956,7 +1956,6 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { final String namespace = "prop-xyz/ns1"; pulsarClient.newProducer().topic(topic).create().close(); TopicName topicName = TopicName.get(topic); - Awaitility.await().until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)); PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); PersistentTopic mockTopic = spy(persistentTopic); mockTopic.checkCompaction(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java index d231564..d6ebdcc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java @@ -333,7 +333,5 @@ public class MaxUnackedMessagesTest extends ProducerConsumerBase { private void waitCacheInit(String topicName) throws Exception { pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close(); TopicName topic = TopicName.get(topicName); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(topic)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 8b8d944..035950e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -130,9 +130,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setBacklogQuota(testTopic, backlogQuota); log.info("Backlog quota set success on topic: {}", testTopic); @@ -156,9 +153,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setBacklogQuota(testTopic, backlogQuota); log.info("Backlog quota set success on topic: {}", testTopic); @@ -236,8 +230,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetBacklogQuotaApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap()); assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap()); Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf); @@ -302,9 +294,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setBacklogQuota(testTopic, backlogQuota); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) @@ -344,9 +333,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { RetentionPolicies retention = new RetentionPolicies(60, 1024); log.info("Retention: {} will set to the topic: {}", retention, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setRetention(testTopic, retention); log.info("Retention set success on topic: {}", testTopic); @@ -362,9 +348,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { RetentionPolicies retention = new RetentionPolicies(60, 1024); log.info("Retention: {} will set to the topic: {}", retention, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setRetention(testTopic, retention); log.info("Retention set success on topic: {}", testTopic); @@ -382,8 +365,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testRetentionAppliedApi() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + RetentionPolicies brokerPolicies = new RetentionPolicies(conf.getDefaultRetentionTimeInMinutes(), conf.getDefaultRetentionSizeInMB()); assertEquals(admin.topics().getRetention(topic, true), brokerPolicies); @@ -411,8 +393,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetSubDispatchRateApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getSubscriptionDispatchRate(topic)); assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)); @@ -452,8 +432,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testRetentionPriority() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getRetention(topic)); assertNull(admin.namespaces().getRetention(myNamespace)); @@ -510,8 +488,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetPersistenceApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getPersistence(topic)); assertNull(admin.namespaces().getPersistence(myNamespace)); PersistencePolicies brokerPolicy @@ -590,9 +566,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1); log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic); - - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); admin.topics().createNonPartitionedTopic(persistenceTopic); admin.topics().setPersistence(persistenceTopic, persistencePolicies); @@ -623,8 +596,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetDispatchRateApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getDispatchRate(topic)); assertNull(admin.namespaces().getDispatchRate(myNamespace)); DispatchRate brokerDispatchRate = DispatchRate.builder() @@ -669,9 +640,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1); log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic); - - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); admin.topics().createNonPartitionedTopic(persistenceTopic); admin.topics().setPersistence(persistenceTopic, persistencePolicies); @@ -711,8 +679,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetMaxProducerApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getMaxProducers(topic)); assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace)); assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic()); @@ -737,9 +703,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Integer maxProducers = 2; log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxProducers(persistenceTopic, maxProducers); @@ -768,9 +731,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testRemoveMaxProducers() throws Exception { Integer maxProducers = 2; log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic); - - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxProducers(persistenceTopic, maxProducers); @@ -831,9 +791,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setDispatchRate(testTopic, dispatchRate); log.info("Dispatch Rate set success on topic: {}", testTopic); @@ -853,9 +810,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setDispatchRate(testTopic, dispatchRate); log.info("Dispatch Rate set success on topic: {}", testTopic); @@ -874,9 +828,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = testTopic + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - DispatchRate dispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(200) .dispatchThrottlingRateInByte(20000) @@ -934,9 +885,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = testTopic + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - // set namespace level inactive topic policies InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true); @@ -995,9 +943,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = testTopic + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - DispatchRate dispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(1000) .dispatchThrottlingRateInByte(1024 * 1024) @@ -1029,9 +974,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = testTopic + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - DispatchRate dispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(1000) .dispatchThrottlingRateInByte(1024 * 1024) @@ -1063,9 +1005,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = testTopic + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - DispatchRate dispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(1000) .dispatchThrottlingRateInByte(1024 * 1024) @@ -1108,9 +1047,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = testTopic + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - // set namespace level subscription dispatch rate DispatchRate namespaceDispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(100) @@ -1169,9 +1105,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Long compactionThreshold = 100000L; log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setCompactionThreshold(testTopic, compactionThreshold); log.info("Compaction threshold set success on topic: {}", testTopic); @@ -1186,9 +1119,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Long compactionThreshold = 100000L; log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setCompactionThreshold(testTopic, compactionThreshold); log.info("Compaction threshold set success on topic: {}", testTopic); @@ -1208,9 +1138,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Integer maxConsumersPerSubscription = 10; log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription); log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic); @@ -1225,9 +1152,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Integer maxConsumersPerSubscription = 10; log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription); log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic); @@ -1247,9 +1171,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5); log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setPublishRate(testTopic, publishRate); log.info("Publish Rate set success on topic: {}", testTopic); @@ -1264,9 +1185,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5); log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setPublishRate(testTopic, publishRate); log.info("Publish Rate set success on topic: {}", testTopic); @@ -1298,8 +1216,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetMaxConsumersApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getMaxConsumers(topic)); assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)); assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), conf.getMaxConsumersPerTopic()); @@ -1327,9 +1243,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace); Integer maxConsumers = 2; log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic); - - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxConsumers(persistenceTopic, maxConsumers); @@ -1359,8 +1272,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test public void testRemoveMaxConsumers() throws Exception { Integer maxConsumers = 2; - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxConsumers(persistenceTopic, maxConsumers); @@ -1411,9 +1322,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetSetSubscribeRate() throws Exception { admin.topics().createPartitionedTopic(persistenceTopic, 2); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - SubscribeRate subscribeRate1 = new SubscribeRate(1, 30); log.info("Subscribe Rate: {} will be set to the namespace: {}", subscribeRate1, myNamespace); admin.namespaces().setSubscribeRate(myNamespace, subscribeRate1); @@ -1466,8 +1374,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetSubscribeRateApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getSubscribeRate(topic)); assertNull(admin.namespaces().getSubscribeRate(myNamespace)); SubscribeRate brokerPolicy = new SubscribeRate( @@ -1502,8 +1408,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { String mySub = "my-sub"; conf.setMaxConsumersPerSubscription(maxConsumerInBroker); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await().until(() -> - pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); List<Consumer<String>> consumerList = new ArrayList<>(); ConsumerBuilder<String> builder = pulsarClient.newConsumer(Schema.STRING) .subscriptionType(SubscriptionType.Shared) @@ -1563,9 +1467,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testRemoveSubscribeRate() throws Exception { admin.topics().createPartitionedTopic(persistenceTopic, 2); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic))); - SubscribeRate subscribeRate = new SubscribeRate(2, 30); log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic); admin.topics().setSubscribeRate(persistenceTopic, subscribeRate); @@ -1646,9 +1547,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { conf.setMaxPublishRatePerTopicInBytes(50L); setup(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - final String topicName = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); pulsarClient.newProducer().topic(topicName).create().close(); Field publishMaxMessageRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxMessageRate"); @@ -1713,8 +1611,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @Test(timeOut = 20000) public void testTopicMaxMessageSizeApi() throws Exception{ - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic))); admin.topics().createNonPartitionedTopic(persistenceTopic); assertNull(admin.topics().getMaxMessageSize(persistenceTopic)); admin.topics().setMaxMessageSize(persistenceTopic,10); @@ -1752,8 +1648,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } // init cache Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getMaxMessageSize(topic)); // set msg size admin.topics().setMaxMessageSize(topic, 10); @@ -1798,8 +1692,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); // init cache pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); int maxSubInNamespace = 2; List<Consumer> consumers = new ArrayList<>(); ConsumerBuilder consumerBuilder = pulsarClient.newConsumer().subscriptionMode(subMode) @@ -1830,8 +1722,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); // init cache pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic)); // set max subscriptions @@ -1856,8 +1746,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); // init cache pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); // Set topic-level max subscriptions final int topicLevelMaxSubNum = 2; admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum); @@ -1903,8 +1791,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { // init cache @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).create(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); //default value is null assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace)); int msgNum = 100; @@ -1992,8 +1878,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); // init cache pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); // Set topic-level max subscriptions final int topicLevelMaxSubNum = 2; admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum); @@ -2073,8 +1957,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); // init cache pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getReplicatorDispatchRate(topic)); @@ -2096,8 +1978,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetReplicatorRateApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getReplicatorDispatchRate(topic)); assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)); DispatchRate brokerDispatchRate = DispatchRate.builder() @@ -2140,8 +2020,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = testTopic + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); //should not fail assertNull(admin.topics().getMessageTTL(topic)); } @@ -2150,9 +2028,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testSubscriptionTypesEnabled() throws Exception { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); - - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); // use broker.conf pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close(); assertNull(admin.topics().getSubscriptionTypesEnabled(topic)); @@ -2207,9 +2082,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { final String topic = "non-persistent://" + myNamespace + "/test-" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); Producer producer = pulsarClient.newProducer().topic(topic).create(); - - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); final String subName = "my-sub"; ConsumerBuilder builder = pulsarClient.newConsumer() .subscriptionType(SubscriptionType.Shared) @@ -2259,8 +2131,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { public void testGetCompactionThresholdApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); assertNull(admin.topics().getCompactionThreshold(topic)); assertNull(admin.namespaces().getCompactionThreshold(myNamespace)); long brokerPolicy = pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes(); @@ -2316,9 +2186,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - admin.topics().setBacklogQuota(testTopic, backlogQuota); log.info("Backlog quota set success on topic: {}", testTopic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index a73cd50..2b33844 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -386,8 +386,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { admin.topics().createPartitionedTopic(topicName, 3); pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close(); TopicName topic = TopicName.get(topicName); - Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().cacheIsInitialized(topic)); InactiveTopicPolicies inactiveTopicPolicies = admin.topics().getInactiveTopicPolicies(topicName); assertNull(inactiveTopicPolicies); @@ -431,9 +429,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { //wait for cache pulsarClient.newConsumer().topic(tp).subscriptionName("my-sub").subscribe().close(); TopicName topicName = TopicName.get(tp); - while (!pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)) { - Thread.sleep(500); - } } InactiveTopicPolicies inactiveTopicPolicies = @@ -513,9 +508,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { producer.close(); Thread.sleep(1); } - //wait for cache init - Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic3))); // "topic" use delete_when_no_subscriptions, "topic2" use delete_when_subscriptions_caught_up // "topic3" use default:delete_when_no_subscriptions InactiveTopicPolicies inactiveTopicPolicies = @@ -555,8 +547,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { final String namespace = "prop/ns-abc"; final String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await() - .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); //namespace-level default value is null assertNull(admin.namespaces().getInactiveTopicPolicies(namespace)); //topic-level default value is null diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 93ad914..879db02 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -97,8 +97,6 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase { .statsInterval(0, TimeUnit.SECONDS).build(); client1.newProducer().topic(topicName).create().close(); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); - Awaitility.await() - .until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); //use broker-level by default assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 1e1dca3..071160b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1236,12 +1236,6 @@ public class ReplicatorTest extends ReplicatorTestBase { String systemTopic = TopicName.get("persistent", NamespaceName.get(namespace), EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME).toString(); admin1.topics().createNonPartitionedTopic(topic); - Awaitility.await() - .until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - Awaitility.await() - .until(() -> pulsar2.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); - Awaitility.await() - .until(() -> pulsar3.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); admin1.topics().setRetention(topic, new RetentionPolicies(10, 10)); admin2.topics().setRetention(topic, new RetentionPolicies(20, 20)); admin3.topics().setRetention(topic, new RetentionPolicies(30, 30)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index d0a04b6..63680db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -191,8 +191,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic TopicName topicName = TopicName.get(topic); admin.topics().createPartitionedTopic(topic, 3); pulsarClient.newProducer().topic(topic).create().close(); - Awaitility.await().untilAsserted(() - -> systemTopicBasedTopicPoliciesService.cacheIsInitialized(topicName)); admin.topics().setMaxConsumers(topic, 1000); Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxConsumers(topic))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index 0e095e2..cb870f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -336,8 +336,6 @@ public class DelayedDeliveryTest extends ProducerConsumerBase { admin.topics().createPartitionedTopic(topicName, 3); pulsarClient.newProducer().topic(topicName).create().close(); - Awaitility.await().untilAsserted(() -> pulsar.getTopicPoliciesService() - .cacheIsInitialized(TopicName.get(topicName))); assertNull(admin.topics().getDelayedDeliveryPolicy(topicName)); DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder() .tickTime(2000) @@ -372,8 +370,6 @@ public class DelayedDeliveryTest extends ProducerConsumerBase { admin.topics().createPartitionedTopic(topicName, 3); pulsarClient.newProducer().topic(topicName).create().close(); - Awaitility.await().untilAsserted(() -> pulsar.getTopicPoliciesService() - .cacheIsInitialized(TopicName.get(topicName))); assertNull(admin.topics().getDelayedDeliveryPolicy(topicName)); //1 Set topic policy DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index 48ccdba..c8f09fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -508,8 +508,6 @@ public class TopicDuplicationTest extends ProducerConsumerBase { private void waitCacheInit(String topicName) throws Exception { pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close(); TopicName topic = TopicName.get(topicName); - Awaitility.await() - .until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(topic)); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java index 762f8f2..8106968 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java @@ -32,8 +32,7 @@ public class BackoffBuilder { private long mandatoryStop; private TimeUnit unitMandatoryStop; - @VisibleForTesting - BackoffBuilder() { + public BackoffBuilder() { this.initial = 0; this.max = 0; this.mandatoryStop = 0;
