This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 15bacf842c0 [fix][broker] Fix inconsensus namespace policies by
`getPoliciesIfCached` (#20855)
15bacf842c0 is described below
commit 15bacf842c0c82fc41b482bf54babfbf4210b9d8
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Jul 25 20:52:02 2023 +0800
[fix][broker] Fix inconsensus namespace policies by `getPoliciesIfCached`
(#20855)
---
.../broker/resources/NamespaceResources.java | 7 ++
.../pulsar/broker/service/AbstractTopic.java | 20 +++-
.../pulsar/broker/service/BrokerService.java | 50 ++++++++--
.../apache/pulsar/broker/service/ServerCnx.java | 110 +++++++++++----------
.../service/nonpersistent/NonPersistentTopic.java | 6 +-
.../service/persistent/DispatchRateLimiter.java | 6 ++
.../broker/service/persistent/PersistentTopic.java | 10 +-
.../service/persistent/SubscribeRateLimiter.java | 14 ++-
8 files changed, 154 insertions(+), 69 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 48f82596567..99ce288fa01 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -117,6 +117,13 @@ public class NamespaceResources extends
BaseResources<Policies> {
return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
+ /**
+ * Get the namespace policy from the metadata cache. This method will not
trigger the load of metadata cache.
+ *
+ * @deprecated Since this method may introduce inconsistent namespace
policies. we should use
+ * #{@link NamespaceResources#getPoliciesAsync}
+ */
+ @Deprecated
public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH,
ns.toString()));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 106654845f8..8e17a022762 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import java.util.ArrayList;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongFunction;
+import javax.annotation.Nonnull;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
@@ -61,6 +63,7 @@ import
org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
@@ -1128,6 +1131,12 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return brokerService.getBrokerPublishRateLimiter();
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and we can use
+ * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead
of it.
+ */
+ @Deprecated
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Policies policies;
try {
@@ -1141,17 +1150,20 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
policies = new Policies();
}
+ updateResourceGroupLimiter(policies);
+ }
+ public void updateResourceGroupLimiter(@Nonnull Policies
namespacePolicies) {
+ requireNonNull(namespacePolicies);
// attach the resource-group level rate limiters, if set
- String rgName = policies.resource_group_name;
+ String rgName = namespacePolicies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
-
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
+
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
if (resourceGroup != null) {
this.resourceGroupRateLimitingEnabled = true;
this.resourceGroupPublishLimiter =
resourceGroup.getResourceGroupPublishLimiter();
-
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
- () -> this.enableCnxAutoRead());
+
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
this::enableCnxAutoRead);
log.info("Using resource group {} rate limiter for topic {}",
rgName, topic);
return;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5cd894b80f1..1f9996e77d9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import static
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -48,11 +49,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
@@ -68,6 +69,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import javax.annotation.Nonnull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -1943,7 +1945,7 @@ public class BrokerService implements Closeable {
}
public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) {
- Objects.requireNonNull(oldBundle);
+ requireNonNull(oldBundle);
try {
// retrieve all topics under existing old bundle
List<Topic> topics =
getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(),
@@ -3273,10 +3275,9 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final
TopicName topicName) {
- Optional<Policies> policies =
- pulsar.getPulsarResources().getNamespaceResources()
- .getPoliciesIfCached(topicName.getNamespaceObject());
- return isAllowAutoTopicCreationAsync(topicName, policies);
+ return pulsar.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(policies ->
isAllowAutoTopicCreationAsync(topicName, policies));
}
private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final
TopicName topicName,
@@ -3346,11 +3347,23 @@ public class BrokerService implements Closeable {
return null;
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+ */
+ @Deprecated
public boolean isAllowAutoSubscriptionCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoSubscriptionCreation(topicName);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+ */
+ @Deprecated
public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
getAutoSubscriptionCreationOverride(topicName);
@@ -3361,6 +3374,12 @@ public class BrokerService implements Closeable {
}
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+ */
+ @Deprecated
private AutoSubscriptionCreationOverride
getAutoSubscriptionCreationOverride(final TopicName topicName) {
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (topicPolicies.isPresent() &&
topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
@@ -3377,6 +3396,25 @@ public class BrokerService implements Closeable {
return null;
}
+ public @Nonnull CompletionStage<Boolean>
isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
+ requireNonNull(tpName);
+ // topic level policies
+ final var topicPolicies = getTopicPolicies(tpName);
+ if (topicPolicies.isPresent() &&
topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
+ return
CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride()
+ .isAllowAutoSubscriptionCreation());
+ }
+ // namespace level policies
+ return
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
+ .thenApply(policies -> {
+ if (policies.isPresent() &&
policies.get().autoSubscriptionCreationOverride != null) {
+ return
policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
+ }
+ // broker level policies
+ return
pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
+ });
+ }
+
public boolean isSystemTopic(String topic) {
return isSystemTopic(TopicName.get(topic));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 913faed45af..b566ed3a05f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -55,6 +55,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -1208,39 +1209,43 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.failedFuture(new
TopicNotFoundException(
"Topic " + topicName + " does
not exist"));
}
+ final Topic topic = optTopic.get();
+ return
service.isAllowAutoSubscriptionCreationAsync(topicName)
+
.thenCompose(isAllowedAutoSubscriptionCreation -> {
+ boolean
rejectSubscriptionIfDoesNotExist = isDurable
+ &&
!isAllowedAutoSubscriptionCreation
+ &&
!topic.getSubscriptions().containsKey(subscriptionName)
+ && topic.isPersistent();
+
+ if (rejectSubscriptionIfDoesNotExist) {
+ return FutureUtil
+ .failedFuture(
+ new
SubscriptionNotFoundException(
+
"Subscription does not exist"));
+ }
- Topic topic = optTopic.get();
-
- boolean rejectSubscriptionIfDoesNotExist =
isDurable
- &&
!service.isAllowAutoSubscriptionCreation(topicName.toString())
- &&
!topic.getSubscriptions().containsKey(subscriptionName)
- && topic.isPersistent();
-
- if (rejectSubscriptionIfDoesNotExist) {
- return FutureUtil
- .failedFuture(
- new
SubscriptionNotFoundException(
- "Subscription does not
exist"));
- }
-
- SubscriptionOption option =
SubscriptionOption.builder().cnx(ServerCnx.this)
- .subscriptionName(subscriptionName)
-
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
-
.consumerName(consumerName).isDurable(isDurable)
-
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
- .initialPosition(initialPosition)
-
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
-
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-
.subscriptionProperties(subscriptionProperties)
- .consumerEpoch(consumerEpoch)
- .schemaType(schema == null ? null :
schema.getType())
- .build();
- if (schema != null && schema.getType() !=
SchemaType.AUTO_CONSUME) {
- return
topic.addSchemaIfIdleOrCheckCompatible(schema)
- .thenCompose(v ->
topic.subscribe(option));
- } else {
- return topic.subscribe(option);
- }
+ SubscriptionOption option =
SubscriptionOption.builder().cnx(ServerCnx.this)
+
.subscriptionName(subscriptionName)
+
.consumerId(consumerId).subType(subType)
+ .priorityLevel(priorityLevel)
+
.consumerName(consumerName).isDurable(isDurable)
+
.startMessageId(startMessageId).metadata(metadata)
+ .readCompacted(readCompacted)
+
.initialPosition(initialPosition)
+
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
+
.replicatedSubscriptionStateArg(isReplicated)
+ .keySharedMeta(keySharedMeta)
+
.subscriptionProperties(subscriptionProperties)
+ .consumerEpoch(consumerEpoch)
+ .schemaType(schema == null ?
null : schema.getType())
+ .build();
+ if (schema != null && schema.getType()
!= SchemaType.AUTO_CONSUME) {
+ return
topic.addSchemaIfIdleOrCheckCompatible(schema)
+ .thenCompose(v ->
topic.subscribe(option));
+ } else {
+ return topic.subscribe(option);
+ }
+ });
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
@@ -1459,33 +1464,38 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
schemaVersionFuture.thenAccept(schemaVersion -> {
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future
-> {
- CompletableFuture<Subscription>
createInitSubFuture;
+ CompletionStage<Subscription> createInitSubFuture;
if (!Strings.isNullOrEmpty(initialSubscriptionName)
&& topic.isPersistent()
&&
!topic.getSubscriptions().containsKey(initialSubscriptionName)) {
- if
(!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
- String msg =
- "Could not create the initial
subscription due to the auto subscription "
- + "creation is not
allowed.";
- if (producerFuture.completeExceptionally(
- new
BrokerServiceException.NotAllowedException(msg))) {
- log.warn("[{}] {}
initialSubscriptionName: {}, topic: {}",
- remoteAddress, msg,
initialSubscriptionName, topicName);
-
commandSender.sendErrorResponse(requestId,
- ServerError.NotAllowedError,
msg);
- }
- producers.remove(producerId,
producerFuture);
- return;
- }
- createInitSubFuture =
-
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
- false, null);
+ createInitSubFuture =
service.isAllowAutoSubscriptionCreationAsync(topicName)
+
.thenCompose(isAllowAutoSubscriptionCreation -> {
+ if
(!isAllowAutoSubscriptionCreation) {
+ return
CompletableFuture.failedFuture(
+ new
BrokerServiceException.NotAllowedException(
+ "Could not create the
initial subscription due to"
+ + " the auto
subscription creation is not allowed."));
+ }
+ return
topic.createSubscription(initialSubscriptionName,
+ InitialPosition.Earliest,
false, null);
+ });
} else {
createInitSubFuture =
CompletableFuture.completedFuture(null);
}
createInitSubFuture.whenComplete((sub, ex) -> {
if (ex != null) {
+ final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
+ if (rc instanceof
BrokerServiceException.NotAllowedException) {
+ log.warn("[{}] {}
initialSubscriptionName: {}, topic: {}",
+ remoteAddress,
rc.getMessage(), initialSubscriptionName, topicName);
+ if
(producerFuture.completeExceptionally(rc)) {
+
commandSender.sendErrorResponse(requestId,
+
ServerError.NotAllowedError, rc.getMessage());
+ }
+ producers.remove(producerId,
producerFuture);
+ return;
+ }
String msg =
"Failed to create the initial
subscription: " + ex.getCause().getMessage();
log.warn("[{}] {} initialSubscriptionName:
{}, topic: {}",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 33258b06726..c4ace2bebb6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -168,17 +168,19 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
return
brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenCompose(optPolicies -> {
+ final Policies policies;
if (!optPolicies.isPresent()) {
log.warn("[{}] Policies not present and
isEncryptionRequired will be set to false", topic);
isEncryptionRequired = false;
+ policies = new Policies();
} else {
- Policies policies = optPolicies.get();
+ policies = optPolicies.get();
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema =
policies.is_allow_auto_update_schema;
}
updatePublishDispatcher();
- updateResourceGroupLimiter(optPolicies);
+ updateResourceGroupLimiter(policies);
return updateClusterMigrated();
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index b1e48035484..800a82cc58c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -204,6 +204,12 @@ public class DispatchRateLimiter {
return
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it.
+ */
+ @Deprecated
public static Optional<Policies> getPolicies(BrokerService brokerService,
String topicName) {
final NamespaceName namespace =
TopicName.get(topicName).getNamespaceObject();
return
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7351e8ed75b..7223dbf3f64 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static
org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
import static
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
@@ -352,7 +353,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
- updateResourceGroupLimiter(optPolicies);
+ updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
return;
@@ -368,7 +369,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
updatePublishDispatcher();
- updateResourceGroupLimiter(optPolicies);
+ updateResourceGroupLimiter(policies);
this.isEncryptionRequired = policies.encryption_required;
@@ -2786,7 +2787,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
@Override
- public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
+ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
+ requireNonNull(data);
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic,
isEncryptionRequired,
data.encryption_required);
@@ -2808,7 +2810,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
updatePublishDispatcher();
- this.updateResourceGroupLimiter(Optional.of(data));
+ updateResourceGroupLimiter(data);
List<CompletableFuture<Void>> producerCheckFutures = new
ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index a052f1f6abf..58f099a9dab 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RateLimiter;
@@ -159,14 +160,21 @@ public class SubscribeRateLimiter {
}
/**
- * Gets configured subscribe-rate from namespace policies. Returns null if
subscribe-rate is not configured
- *
- * @return
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call.
*/
+ @Deprecated
public SubscribeRate getPoliciesSubscribeRate() {
return getPoliciesSubscribeRate(brokerService, topicName);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call.
+ */
+ @Deprecated
public static SubscribeRate getPoliciesSubscribeRate(BrokerService
brokerService, final String topicName) {
final String cluster =
brokerService.pulsar().getConfiguration().getClusterName();
final Optional<Policies> policies =
DispatchRateLimiter.getPolicies(brokerService, topicName);