This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 15bacf842c0 [fix][broker] Fix inconsensus namespace policies by 
`getPoliciesIfCached` (#20855)
15bacf842c0 is described below

commit 15bacf842c0c82fc41b482bf54babfbf4210b9d8
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Jul 25 20:52:02 2023 +0800

    [fix][broker] Fix inconsensus namespace policies by `getPoliciesIfCached` 
(#20855)
---
 .../broker/resources/NamespaceResources.java       |   7 ++
 .../pulsar/broker/service/AbstractTopic.java       |  20 +++-
 .../pulsar/broker/service/BrokerService.java       |  50 ++++++++--
 .../apache/pulsar/broker/service/ServerCnx.java    | 110 +++++++++++----------
 .../service/nonpersistent/NonPersistentTopic.java  |   6 +-
 .../service/persistent/DispatchRateLimiter.java    |   6 ++
 .../broker/service/persistent/PersistentTopic.java |  10 +-
 .../service/persistent/SubscribeRateLimiter.java   |  14 ++-
 8 files changed, 154 insertions(+), 69 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 48f82596567..99ce288fa01 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -117,6 +117,13 @@ public class NamespaceResources extends 
BaseResources<Policies> {
         return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
     }
 
+    /**
+     * Get the namespace policy from the metadata cache. This method will not 
trigger the load of metadata cache.
+     *
+     * @deprecated Since this method may introduce inconsistent namespace 
policies. we should use
+     * #{@link NamespaceResources#getPoliciesAsync}
+     */
+    @Deprecated
     public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
         return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, 
ns.toString()));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 106654845f8..8e17a022762 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import com.google.common.base.MoreObjects;
 import java.util.ArrayList;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.ToLongFunction;
+import javax.annotation.Nonnull;
 import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.collections4.CollectionUtils;
@@ -61,6 +63,7 @@ import 
org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
@@ -1128,6 +1131,12 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return brokerService.getBrokerPublishRateLimiter();
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and we can use
+     * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead 
of it.
+     */
+    @Deprecated
     public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
         Policies policies;
         try {
@@ -1141,17 +1150,20 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
             policies = new Policies();
         }
+        updateResourceGroupLimiter(policies);
+    }
 
+    public void updateResourceGroupLimiter(@Nonnull Policies 
namespacePolicies) {
+        requireNonNull(namespacePolicies);
         // attach the resource-group level rate limiters, if set
-        String rgName = policies.resource_group_name;
+        String rgName = namespacePolicies.resource_group_name;
         if (rgName != null) {
             final ResourceGroup resourceGroup =
-              
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
+                    
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
             if (resourceGroup != null) {
                 this.resourceGroupRateLimitingEnabled = true;
                 this.resourceGroupPublishLimiter = 
resourceGroup.getResourceGroupPublishLimiter();
-                
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
-                  () -> this.enableCnxAutoRead());
+                
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), 
this::enableCnxAutoRead);
                 log.info("Using resource group {} rate limiter for topic {}", 
rgName, topic);
                 return;
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5cd894b80f1..1f9996e77d9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
 import static org.apache.commons.collections4.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -48,11 +49,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
@@ -68,6 +69,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
+import javax.annotation.Nonnull;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -1943,7 +1945,7 @@ public class BrokerService implements Closeable {
     }
 
     public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) {
-        Objects.requireNonNull(oldBundle);
+        requireNonNull(oldBundle);
         try {
             // retrieve all topics under existing old bundle
             List<Topic> topics = 
getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(),
@@ -3273,10 +3275,9 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final 
TopicName topicName) {
-        Optional<Policies> policies =
-                pulsar.getPulsarResources().getNamespaceResources()
-                        .getPoliciesIfCached(topicName.getNamespaceObject());
-        return isAllowAutoTopicCreationAsync(topicName, policies);
+        return pulsar.getPulsarResources().getNamespaceResources()
+                        .getPoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(policies -> 
isAllowAutoTopicCreationAsync(topicName, policies));
     }
 
     private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final 
TopicName topicName,
@@ -3346,11 +3347,23 @@ public class BrokerService implements Closeable {
         return null;
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+     */
+    @Deprecated
     public boolean isAllowAutoSubscriptionCreation(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return isAllowAutoSubscriptionCreation(topicName);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+     */
+    @Deprecated
     public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
         AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
                 getAutoSubscriptionCreationOverride(topicName);
@@ -3361,6 +3374,12 @@ public class BrokerService implements Closeable {
         }
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+     */
+    @Deprecated
     private AutoSubscriptionCreationOverride 
getAutoSubscriptionCreationOverride(final TopicName topicName) {
         Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
         if (topicPolicies.isPresent() && 
topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
@@ -3377,6 +3396,25 @@ public class BrokerService implements Closeable {
         return null;
     }
 
+    public @Nonnull CompletionStage<Boolean> 
isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
+        requireNonNull(tpName);
+        // topic level policies
+        final var topicPolicies = getTopicPolicies(tpName);
+        if (topicPolicies.isPresent() && 
topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
+            return 
CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride()
+                    .isAllowAutoSubscriptionCreation());
+        }
+        // namespace level policies
+        return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
+                .thenApply(policies -> {
+                    if (policies.isPresent() && 
policies.get().autoSubscriptionCreationOverride != null) {
+                        return 
policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
+                    }
+                    // broker level policies
+                    return 
pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
+                });
+    }
+
     public boolean isSystemTopic(String topic) {
         return isSystemTopic(TopicName.get(topic));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 913faed45af..b566ed3a05f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -55,6 +55,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -1208,39 +1209,43 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                         .failedFuture(new 
TopicNotFoundException(
                                                 "Topic " + topicName + " does 
not exist"));
                             }
+                            final Topic topic = optTopic.get();
+                            return 
service.isAllowAutoSubscriptionCreationAsync(topicName)
+                                    
.thenCompose(isAllowedAutoSubscriptionCreation -> {
+                                        boolean 
rejectSubscriptionIfDoesNotExist = isDurable
+                                                && 
!isAllowedAutoSubscriptionCreation
+                                                && 
!topic.getSubscriptions().containsKey(subscriptionName)
+                                                && topic.isPersistent();
+
+                                        if (rejectSubscriptionIfDoesNotExist) {
+                                            return FutureUtil
+                                                    .failedFuture(
+                                                            new 
SubscriptionNotFoundException(
+                                                                    
"Subscription does not exist"));
+                                        }
 
-                            Topic topic = optTopic.get();
-
-                            boolean rejectSubscriptionIfDoesNotExist = 
isDurable
-                                && 
!service.isAllowAutoSubscriptionCreation(topicName.toString())
-                                && 
!topic.getSubscriptions().containsKey(subscriptionName)
-                                && topic.isPersistent();
-
-                            if (rejectSubscriptionIfDoesNotExist) {
-                                return FutureUtil
-                                        .failedFuture(
-                                                new 
SubscriptionNotFoundException(
-                                                        "Subscription does not 
exist"));
-                            }
-
-                            SubscriptionOption option = 
SubscriptionOption.builder().cnx(ServerCnx.this)
-                                    .subscriptionName(subscriptionName)
-                                    
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
-                                    
.consumerName(consumerName).isDurable(isDurable)
-                                    
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
-                                    .initialPosition(initialPosition)
-                                    
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
-                                    
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-                                    
.subscriptionProperties(subscriptionProperties)
-                                    .consumerEpoch(consumerEpoch)
-                                    .schemaType(schema == null ? null : 
schema.getType())
-                                    .build();
-                            if (schema != null && schema.getType() != 
SchemaType.AUTO_CONSUME) {
-                                return 
topic.addSchemaIfIdleOrCheckCompatible(schema)
-                                        .thenCompose(v -> 
topic.subscribe(option));
-                            } else {
-                                return topic.subscribe(option);
-                            }
+                                        SubscriptionOption option = 
SubscriptionOption.builder().cnx(ServerCnx.this)
+                                                
.subscriptionName(subscriptionName)
+                                                
.consumerId(consumerId).subType(subType)
+                                                .priorityLevel(priorityLevel)
+                                                
.consumerName(consumerName).isDurable(isDurable)
+                                                
.startMessageId(startMessageId).metadata(metadata)
+                                                .readCompacted(readCompacted)
+                                                
.initialPosition(initialPosition)
+                                                
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
+                                                
.replicatedSubscriptionStateArg(isReplicated)
+                                                .keySharedMeta(keySharedMeta)
+                                                
.subscriptionProperties(subscriptionProperties)
+                                                .consumerEpoch(consumerEpoch)
+                                                .schemaType(schema == null ? 
null : schema.getType())
+                                                .build();
+                                        if (schema != null && schema.getType() 
!= SchemaType.AUTO_CONSUME) {
+                                            return 
topic.addSchemaIfIdleOrCheckCompatible(schema)
+                                                    .thenCompose(v -> 
topic.subscribe(option));
+                                        } else {
+                                            return topic.subscribe(option);
+                                        }
+                                    });
                         })
                         .thenAccept(consumer -> {
                             if (consumerFuture.complete(consumer)) {
@@ -1459,33 +1464,38 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                     schemaVersionFuture.thenAccept(schemaVersion -> {
                         
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future 
-> {
-                            CompletableFuture<Subscription> 
createInitSubFuture;
+                            CompletionStage<Subscription> createInitSubFuture;
                             if (!Strings.isNullOrEmpty(initialSubscriptionName)
                                     && topic.isPersistent()
                                     && 
!topic.getSubscriptions().containsKey(initialSubscriptionName)) {
-                                if 
(!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
-                                    String msg =
-                                            "Could not create the initial 
subscription due to the auto subscription "
-                                                    + "creation is not 
allowed.";
-                                    if (producerFuture.completeExceptionally(
-                                            new 
BrokerServiceException.NotAllowedException(msg))) {
-                                        log.warn("[{}] {} 
initialSubscriptionName: {}, topic: {}",
-                                                remoteAddress, msg, 
initialSubscriptionName, topicName);
-                                        
commandSender.sendErrorResponse(requestId,
-                                                ServerError.NotAllowedError, 
msg);
-                                    }
-                                    producers.remove(producerId, 
producerFuture);
-                                    return;
-                                }
-                                createInitSubFuture =
-                                        
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
-                                                false, null);
+                                createInitSubFuture = 
service.isAllowAutoSubscriptionCreationAsync(topicName)
+                                        
.thenCompose(isAllowAutoSubscriptionCreation -> {
+                                            if 
(!isAllowAutoSubscriptionCreation) {
+                                                return 
CompletableFuture.failedFuture(
+                                                        new 
BrokerServiceException.NotAllowedException(
+                                                        "Could not create the 
initial subscription due to"
+                                                            + " the auto 
subscription creation is not allowed."));
+                                            }
+                                            return 
topic.createSubscription(initialSubscriptionName,
+                                                    InitialPosition.Earliest, 
false, null);
+                                        });
                             } else {
                                 createInitSubFuture = 
CompletableFuture.completedFuture(null);
                             }
 
                             createInitSubFuture.whenComplete((sub, ex) -> {
                                 if (ex != null) {
+                                    final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
+                                    if (rc instanceof 
BrokerServiceException.NotAllowedException) {
+                                        log.warn("[{}] {} 
initialSubscriptionName: {}, topic: {}",
+                                                remoteAddress, 
rc.getMessage(), initialSubscriptionName, topicName);
+                                        if 
(producerFuture.completeExceptionally(rc)) {
+                                            
commandSender.sendErrorResponse(requestId,
+                                                    
ServerError.NotAllowedError, rc.getMessage());
+                                        }
+                                        producers.remove(producerId, 
producerFuture);
+                                        return;
+                                    }
                                     String msg =
                                             "Failed to create the initial 
subscription: " + ex.getCause().getMessage();
                                     log.warn("[{}] {} initialSubscriptionName: 
{}, topic: {}",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 33258b06726..c4ace2bebb6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -168,17 +168,19 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
                 .thenCompose(optPolicies -> {
+                    final Policies policies;
                     if (!optPolicies.isPresent()) {
                         log.warn("[{}] Policies not present and 
isEncryptionRequired will be set to false", topic);
                         isEncryptionRequired = false;
+                        policies = new Policies();
                     } else {
-                        Policies policies = optPolicies.get();
+                        policies = optPolicies.get();
                         updateTopicPolicyByNamespacePolicy(policies);
                         isEncryptionRequired = policies.encryption_required;
                         isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
                     }
                     updatePublishDispatcher();
-                    updateResourceGroupLimiter(optPolicies);
+                    updateResourceGroupLimiter(policies);
                     return updateClusterMigrated();
                 });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index b1e48035484..800a82cc58c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -204,6 +204,12 @@ public class DispatchRateLimiter {
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it.
+     */
+    @Deprecated
     public static Optional<Policies> getPolicies(BrokerService brokerService, 
String topicName) {
         final NamespaceName namespace = 
TopicName.get(topicName).getNamespaceObject();
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7351e8ed75b..7223dbf3f64 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
@@ -352,7 +353,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     if (!optPolicies.isPresent()) {
                         isEncryptionRequired = false;
                         updatePublishDispatcher();
-                        updateResourceGroupLimiter(optPolicies);
+                        updateResourceGroupLimiter(new Policies());
                         initializeDispatchRateLimiterIfNeeded();
                         updateSubscribeRateLimiter();
                         return;
@@ -368,7 +369,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
                     updatePublishDispatcher();
 
-                    updateResourceGroupLimiter(optPolicies);
+                    updateResourceGroupLimiter(policies);
 
                     this.isEncryptionRequired = policies.encryption_required;
 
@@ -2786,7 +2787,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     @Override
-    public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
+    public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
+        requireNonNull(data);
         if (log.isDebugEnabled()) {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, 
isEncryptionRequired,
                     data.encryption_required);
@@ -2808,7 +2810,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         updatePublishDispatcher();
 
-        this.updateResourceGroupLimiter(Optional.of(data));
+        updateResourceGroupLimiter(data);
 
         List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
         producers.values().forEach(producer -> producerCheckFutures.add(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index a052f1f6abf..58f099a9dab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.util.RateLimiter;
@@ -159,14 +160,21 @@ public class SubscribeRateLimiter {
     }
 
     /**
-     * Gets configured subscribe-rate from namespace policies. Returns null if 
subscribe-rate is not configured
-     *
-     * @return
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call.
      */
+    @Deprecated
     public SubscribeRate getPoliciesSubscribeRate() {
         return getPoliciesSubscribeRate(brokerService, topicName);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call.
+     */
+    @Deprecated
     public static SubscribeRate getPoliciesSubscribeRate(BrokerService 
brokerService, final String topicName) {
         final String cluster = 
brokerService.pulsar().getConfiguration().getClusterName();
         final Optional<Policies> policies = 
DispatchRateLimiter.getPolicies(brokerService, topicName);

Reply via email to