This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 6d321e1f031 [branch-2.10][fix][broker] Fix inconsensus namespace 
policies by getPoliciesIfCached (#20873)
6d321e1f031 is described below

commit 6d321e1f03140a1b7e5a364e286dfbd969f30ba2
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Jul 26 10:19:14 2023 +0800

    [branch-2.10][fix][broker] Fix inconsensus namespace policies by 
getPoliciesIfCached (#20873)
---
 .../broker/resources/NamespaceResources.java       |   7 ++
 .../broker/admin/impl/PersistentTopicsBase.java    |  21 ++--
 .../pulsar/broker/lookup/TopicLookupBase.java      |   8 +-
 .../pulsar/broker/service/AbstractTopic.java       |  20 +++-
 .../pulsar/broker/service/BrokerService.java       |  64 ++++++++++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 120 +++++++++++----------
 .../service/nonpersistent/NonPersistentTopic.java  |   6 +-
 .../service/persistent/DispatchRateLimiter.java    |   6 ++
 .../broker/service/persistent/PersistentTopic.java |  11 +-
 .../service/persistent/SubscribeRateLimiter.java   |  14 ++-
 .../broker/lookup/http/HttpTopicLookupv2Test.java  |   3 +
 11 files changed, 198 insertions(+), 82 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 74d4bcf6c47..90e3971c4cf 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -122,6 +122,13 @@ public class NamespaceResources extends 
BaseResources<Policies> {
         return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
     }
 
+    /**
+     * Get the namespace policy from the metadata cache. This method will not 
trigger the load of metadata cache.
+     *
+     * @deprecated Since this method may introduce inconsistent namespace 
policies. we should use
+     * #{@link NamespaceResources#getPoliciesAsync}
+     */
+    @Deprecated
     public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
         return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, 
ns.toString()));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index da37d1b7e54..f4e5caacded 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2208,9 +2208,10 @@ public class PersistentTopicsBase extends AdminResource {
                 internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
                         subscriptionName, targetMessageId, authoritative, 
replicated, properties);
             } else {
-                boolean allowAutoTopicCreation = 
pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
-                getPartitionedTopicMetadataAsync(topicName,
-                        authoritative, 
allowAutoTopicCreation).thenAccept(partitionMetadata -> {
+                
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+                    .thenCompose(allowAutoTopicCreation ->
+                            getPartitionedTopicMetadataAsync(topicName, 
authoritative, allowAutoTopicCreation))
+                    .thenAccept(partitionMetadata -> {
                     final int numPartitions = partitionMetadata.partitions;
                     if (numPartitions > 0) {
                         final CompletableFuture<Void> future = new 
CompletableFuture<>();
@@ -2306,13 +2307,13 @@ public class PersistentTopicsBase extends AdminResource 
{
             MessageIdImpl targetMessageId, boolean authoritative, boolean 
replicated,
             Map<String, String> properties) {
 
-        boolean isAllowAutoTopicCreation = 
pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
-
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> {
-                    validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE, subscriptionName);
-                    return 
pulsar().getBrokerService().getTopic(topicName.toString(), 
isAllowAutoTopicCreation);
-                }).thenApply(optTopic -> {
+        pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+            .thenCompose(isAllowAutoTopicCreation -> 
validateTopicOwnershipAsync(topicName, authoritative)
+                    .thenCompose(__ -> {
+                        validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE, subscriptionName);
+                        return 
pulsar().getBrokerService().getTopic(topicName.toString(), 
isAllowAutoTopicCreation);
+                    }))
+           .thenApply(optTopic -> {
             if (optTopic.isPresent()) {
                 return optTopic.get();
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index ae1d2a5bab0..7baebc7d1b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -83,9 +83,11 @@ public class TopicLookupBase extends PulsarWebResource {
 
         // Currently, it's hard to check the non-persistent-non-partitioned 
topic, because it only exists in the broker,
         // it doesn't have metadata. If the topic is non-persistent and 
non-partitioned, we'll return the true flag.
-        CompletableFuture<Boolean> existFuture = 
pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)
-                || (!topicName.isPersistent() && !topicName.isPartitioned())
-                ? CompletableFuture.completedFuture(true) : 
pulsar().getNamespaceService().checkTopicExists(topicName);
+        CompletableFuture<Boolean> existFuture = 
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+                .thenCompose(isAllowAutoTopicCreation ->
+                        isAllowAutoTopicCreation || (!topicName.isPersistent() 
&& !topicName.isPartitioned())
+                         ? CompletableFuture.completedFuture(true) :
+                                
pulsar().getNamespaceService().checkTopicExists(topicName));
         existFuture.thenAccept(exist -> {
             if (!exist) {
                 completeLookupResponseExceptionally(asyncResponse, new 
RestException(Response.Status.NOT_FOUND,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 736fcf1f5e1..19abb22b017 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.ToLongFunction;
+import javax.annotation.Nonnull;
 import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.collections4.CollectionUtils;
@@ -57,6 +59,7 @@ import 
org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
@@ -1065,6 +1068,12 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return brokerService.getBrokerPublishRateLimiter();
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and we can use
+     * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead 
of it.
+     */
+    @Deprecated
     public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
         Policies policies;
         try {
@@ -1078,17 +1087,20 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
             policies = new Policies();
         }
+        updateResourceGroupLimiter(policies);
+    }
 
+    public void updateResourceGroupLimiter(@Nonnull Policies 
namespacePolicies) {
+        requireNonNull(namespacePolicies);
         // attach the resource-group level rate limiters, if set
-        String rgName = policies.resource_group_name;
+        String rgName = namespacePolicies.resource_group_name;
         if (rgName != null) {
             final ResourceGroup resourceGroup =
-              
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
+                    
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
             if (resourceGroup != null) {
                 this.resourceGroupRateLimitingEnabled = true;
                 this.resourceGroupPublishLimiter = 
resourceGroup.getResourceGroupPublishLimiter();
-                
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
-                  () -> this.enableCnxAutoRead());
+                
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), 
this::enableCnxAutoRead);
                 log.info("Using resource group {} rate limiter for topic {}", 
rgName, topic);
                 return;
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 50bfe6d3df5..28ef4d35ec8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -56,6 +57,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -72,6 +74,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
+import javax.annotation.Nonnull;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -931,7 +934,15 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
-        return getTopic(topic, 
isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
+        final TopicName topicName;
+        try {
+             topicName = TopicName.get(topic);
+        } catch (Throwable ex) {
+            return FutureUtil.failedFuture(ex);
+        }
+        return isAllowAutoTopicCreationAsync(topicName)
+                .thenCompose(isAllowAutoTopicCreation -> getTopic(topic, 
isAllowAutoTopicCreation)
+                        .thenApply(Optional::get));
     }
 
     public CompletableFuture<Optional<Topic>> getTopic(final String topic, 
boolean createIfMissing) {
@@ -2718,7 +2729,8 @@ public class BrokerService implements Closeable {
                                     if (metadata.partitions == 0
                                             && !topicExists
                                             && !topicName.isPartitioned()
-                                            && 
pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies)
+                                            && pulsar.getBrokerService()
+                                                
.isAllowAutoTopicCreation(topicName, policies)
                                             && pulsar.getBrokerService()
                                                             
.isDefaultTopicTypePartitioned(topicName, policies)) {
 
@@ -2944,11 +2956,23 @@ public class BrokerService implements Closeable {
         cnxSet.forEach(consumer);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
+     * You can use #{@link 
BrokerService#isAllowAutoTopicCreationAsync(TopicName)}
+     */
+    @Deprecated
     public boolean isAllowAutoTopicCreation(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return isAllowAutoTopicCreation(topicName);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
+     * You can use #{@link 
BrokerService#isAllowAutoTopicCreationAsync(TopicName)}
+     */
+    @Deprecated
     public boolean isAllowAutoTopicCreation(final TopicName topicName) {
         Optional<Policies> policies =
                 pulsar.getPulsarResources().getNamespaceResources()
@@ -2956,6 +2980,11 @@ public class BrokerService implements Closeable {
         return isAllowAutoTopicCreation(topicName, policies);
     }
 
+    public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final 
TopicName topicName) {
+        return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
+                .thenApply(policies -> isAllowAutoTopicCreation(topicName, 
policies));
+    }
+
     public boolean isAllowAutoTopicCreation(final TopicName topicName, final 
Optional<Policies> policies) {
         if (policies.isPresent() && policies.get().deleted) {
             log.info("Preventing AutoTopicCreation on a namespace that is 
being deleted {}",
@@ -3002,11 +3031,23 @@ public class BrokerService implements Closeable {
         return null;
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+     */
+    @Deprecated
     public boolean isAllowAutoSubscriptionCreation(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return isAllowAutoSubscriptionCreation(topicName);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+     */
+    @Deprecated
     public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
         AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
                 getAutoSubscriptionCreationOverride(topicName);
@@ -3017,6 +3058,12 @@ public class BrokerService implements Closeable {
         }
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+     */
+    @Deprecated
     private AutoSubscriptionCreationOverride 
getAutoSubscriptionCreationOverride(final TopicName topicName) {
         Optional<Policies> policies =
                 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
@@ -3028,6 +3075,19 @@ public class BrokerService implements Closeable {
         return null;
     }
 
+    public @Nonnull CompletionStage<Boolean> 
isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
+        requireNonNull(tpName);
+        // namespace level policies
+        return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
+                .thenApply(policies -> {
+                    if (policies.isPresent() && 
policies.get().autoSubscriptionCreationOverride != null) {
+                        return 
policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
+                    }
+                    // broker level policies
+                    return 
pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
+                });
+    }
+
     public boolean isSystemTopic(String topic) {
         return isSystemTopic(TopicName.get(topic));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c9d8a2ff700..ef7cfdf14d8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -53,6 +53,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -1034,48 +1035,53 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     return null;
                 }
 
-                boolean createTopicIfDoesNotExist = forceTopicCreation
-                        && 
service.isAllowAutoTopicCreation(topicName.toString());
-
-                service.getTopic(topicName.toString(), 
createTopicIfDoesNotExist)
+                service.isAllowAutoTopicCreationAsync(topicName)
+                        .thenCompose(isAllowAutoTopicCreation -> {
+                            final boolean createTopicIfDoesNotExist = 
forceTopicCreation && isAllowAutoTopicCreation;
+                            return service.getTopic(topicName.toString(), 
createTopicIfDoesNotExist);
+                        })
                         .thenCompose(optTopic -> {
                             if (!optTopic.isPresent()) {
                                 return FutureUtil
                                         .failedFuture(new 
TopicNotFoundException(
                                                 "Topic " + topicName + " does 
not exist"));
                             }
-
-                            Topic topic = optTopic.get();
-
-                            boolean rejectSubscriptionIfDoesNotExist = 
isDurable
-                                && 
!service.isAllowAutoSubscriptionCreation(topicName.toString())
-                                && 
!topic.getSubscriptions().containsKey(subscriptionName)
-                                && topic.isPersistent();
-
-                            if (rejectSubscriptionIfDoesNotExist) {
-                                return FutureUtil
-                                        .failedFuture(
-                                                new 
SubscriptionNotFoundException(
-                                                        "Subscription does not 
exist"));
-                            }
-
-                            SubscriptionOption option = 
SubscriptionOption.builder().cnx(ServerCnx.this)
-                                    .subscriptionName(subscriptionName)
-                                    
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
-                                    
.consumerName(consumerName).isDurable(isDurable)
-                                    
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
-                                    .initialPosition(initialPosition)
-                                    
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
-                                    
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-                                    
.subscriptionProperties(subscriptionProperties)
-                                    .consumerEpoch(consumerEpoch)
-                                    .build();
-                            if (schema != null) {
-                                return 
topic.addSchemaIfIdleOrCheckCompatible(schema)
-                                        .thenCompose(v -> 
topic.subscribe(option));
-                            } else {
-                                return topic.subscribe(option);
-                            }
+                            final Topic topic = optTopic.get();
+                            return 
service.isAllowAutoSubscriptionCreationAsync(topicName)
+                                    
.thenCompose(isAllowAutoSubscriptionCreation -> {
+                                        boolean 
rejectSubscriptionIfDoesNotExist = isDurable
+                                                && 
!isAllowAutoSubscriptionCreation
+                                                && 
!topic.getSubscriptions().containsKey(subscriptionName)
+                                                && topic.isPersistent();
+
+                                        if (rejectSubscriptionIfDoesNotExist) {
+                                            return FutureUtil
+                                                    .failedFuture(
+                                                            new 
SubscriptionNotFoundException(
+                                                                    
"Subscription does not exist"));
+                                        }
+
+                                        SubscriptionOption option = 
SubscriptionOption.builder().cnx(ServerCnx.this)
+                                                
.subscriptionName(subscriptionName)
+                                                
.consumerId(consumerId).subType(subType)
+                                                .priorityLevel(priorityLevel)
+                                                
.consumerName(consumerName).isDurable(isDurable)
+                                                
.startMessageId(startMessageId).metadata(metadata)
+                                                .readCompacted(readCompacted)
+                                                
.initialPosition(initialPosition)
+                                                
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
+                                                
.replicatedSubscriptionStateArg(isReplicated)
+                                                .keySharedMeta(keySharedMeta)
+                                                
.subscriptionProperties(subscriptionProperties)
+                                                .consumerEpoch(consumerEpoch)
+                                                .build();
+                                        if (schema != null) {
+                                            return 
topic.addSchemaIfIdleOrCheckCompatible(schema)
+                                                    .thenCompose(v -> 
topic.subscribe(option));
+                                        } else {
+                                            return topic.subscribe(option);
+                                        }
+                                    });
                         })
                         .thenAccept(consumer -> {
                             if (consumerFuture.complete(consumer)) {
@@ -1286,33 +1292,39 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                     schemaVersionFuture.thenAccept(schemaVersion -> {
                         
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future 
-> {
-                            CompletableFuture<Subscription> 
createInitSubFuture;
+                            CompletionStage<Subscription> createInitSubFuture;
                             if (!Strings.isNullOrEmpty(initialSubscriptionName)
                                     && topic.isPersistent()
                                     && 
!topic.getSubscriptions().containsKey(initialSubscriptionName)) {
-                                if 
(!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
-                                    String msg =
-                                            "Could not create the initial 
subscription due to the auto subscription "
-                                                    + "creation is not 
allowed.";
-                                    if (producerFuture.completeExceptionally(
-                                            new 
BrokerServiceException.NotAllowedException(msg))) {
-                                        log.warn("[{}] {} 
initialSubscriptionName: {}, topic: {}",
-                                                remoteAddress, msg, 
initialSubscriptionName, topicName);
-                                        
commandSender.sendErrorResponse(requestId,
-                                                ServerError.NotAllowedError, 
msg);
-                                    }
-                                    producers.remove(producerId, 
producerFuture);
-                                    return;
-                                }
-                                createInitSubFuture =
-                                        
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
-                                                false, null);
+                                createInitSubFuture = 
service.isAllowAutoSubscriptionCreationAsync(topicName)
+                                    
.thenCompose(isAllowAutoSubscriptionCreation -> {
+                                        if (!isAllowAutoSubscriptionCreation) {
+                                            return FutureUtil.failedFuture(
+                                                    new 
BrokerServiceException.NotAllowedException(
+                                                            "Could not create 
the initial subscription due to"
+                                                            + " the auto 
subscription creation is not allowed."));
+                                        }
+                                        return 
topic.createSubscription(initialSubscriptionName,
+                                                InitialPosition.Earliest, 
false, null);
+                                    });
                             } else {
                                 createInitSubFuture = 
CompletableFuture.completedFuture(null);
                             }
 
                             createInitSubFuture.whenComplete((sub, ex) -> {
                                 if (ex != null) {
+                                    final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
+                                    if (rc instanceof 
BrokerServiceException.NotAllowedException) {
+                                        log.warn("[{}] {} 
initialSubscriptionName: {}, topic: {}",
+                                                remoteAddress, 
rc.getMessage(), initialSubscriptionName, topicName);
+                                        if 
(producerFuture.completeExceptionally(rc)) {
+                                            
commandSender.sendErrorResponse(requestId,
+                                                    
ServerError.NotAllowedError, rc.getMessage());
+                                        }
+                                        producers.remove(producerId, 
producerFuture);
+                                        return;
+                                    }
+
                                     String msg =
                                             "Failed to create the initial 
subscription: " + ex.getCause().getMessage();
                                     log.warn("[{}] {} initialSubscriptionName: 
{}, topic: {}",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 54f738101be..a021e7ce94d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -157,18 +157,20 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
                 .thenAccept(optPolicies -> {
+                    final Policies policies;
                     if (!optPolicies.isPresent()) {
                         log.warn("[{}] Policies not present and 
isEncryptionRequired will be set to false", topic);
                         isEncryptionRequired = false;
+                        policies = new Policies();
                     } else {
-                        Policies policies = optPolicies.get();
+                        policies = optPolicies.get();
                         updateTopicPolicyByNamespacePolicy(policies);
                         isEncryptionRequired = policies.encryption_required;
                         isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
                         schemaValidationEnforced = 
policies.schema_validation_enforced;
                     }
                     updatePublishDispatcher();
-                    updateResourceGroupLimiter(optPolicies);
+                    updateResourceGroupLimiter(policies);
                 });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index cd7f5e9ea64..568ae0915cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -364,6 +364,12 @@ public class DispatchRateLimiter {
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call. we can use #{@link 
DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it.
+     */
+    @Deprecated
     public static Optional<Policies> getPolicies(BrokerService brokerService, 
String topicName) {
         final NamespaceName namespace = 
TopicName.get(topicName).getNamespaceObject();
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5054b039af7..ec174a41d63 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
 import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
@@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
 import lombok.Getter;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -316,7 +318,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     if (!optPolicies.isPresent()) {
                         isEncryptionRequired = false;
                         updatePublishDispatcher();
-                        updateResourceGroupLimiter(optPolicies);
+                        updateResourceGroupLimiter(new Policies());
                         initializeDispatchRateLimiterIfNeeded();
                         updateSubscribeRateLimiter();
                         return;
@@ -332,7 +334,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
                     updatePublishDispatcher();
 
-                    updateResourceGroupLimiter(optPolicies);
+                    updateResourceGroupLimiter(policies);
 
                     this.isEncryptionRequired = policies.encryption_required;
 
@@ -2419,7 +2421,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     @Override
-    public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
+    public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
+        requireNonNull(data);
         if (log.isDebugEnabled()) {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, 
isEncryptionRequired,
                     data.encryption_required);
@@ -2443,7 +2446,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         updatePublishDispatcher();
 
-        this.updateResourceGroupLimiter(Optional.of(data));
+        updateResourceGroupLimiter(data);
 
         List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
         producers.values().forEach(producer -> producerCheckFutures.add(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 89af6f6be88..f602659400f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.util.RateLimiter;
@@ -159,14 +160,21 @@ public class SubscribeRateLimiter {
     }
 
     /**
-     * Gets configured subscribe-rate from namespace policies. Returns null if 
subscribe-rate is not configured
-     *
-     * @return
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call.
      */
+    @Deprecated
     public SubscribeRate getPoliciesSubscribeRate() {
         return getPoliciesSubscribeRate(brokerService, topicName);
     }
 
+    /**
+     * @deprecated Avoid using the deprecated method
+     * #{@link 
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
 and blocking
+     * call.
+     */
+    @Deprecated
     public static SubscribeRate getPoliciesSubscribeRate(BrokerService 
brokerService, final String topicName) {
         final String cluster = 
brokerService.pulsar().getConfiguration().getClusterName();
         final Optional<Policies> policies = 
DispatchRateLimiter.getPolicies(brokerService, topicName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 5ae60762748..5906c6f4fe2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -142,6 +142,9 @@ public class HttpTopicLookupv2Test {
         doReturn(uri).when(uriInfo).getRequestUri();
         doReturn(true).when(config).isAuthorizationEnabled();
 
+        BrokerService brokerService = pulsar.getBrokerService();
+        doReturn(CompletableFuture.completedFuture(false))
+                .when(brokerService).isAllowAutoTopicCreationAsync(any());
         NamespaceService namespaceService = pulsar.getNamespaceService();
         CompletableFuture<Boolean> future = new CompletableFuture<>();
         future.complete(false);


Reply via email to