This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 6d321e1f031 [branch-2.10][fix][broker] Fix inconsensus namespace
policies by getPoliciesIfCached (#20873)
6d321e1f031 is described below
commit 6d321e1f03140a1b7e5a364e286dfbd969f30ba2
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Jul 26 10:19:14 2023 +0800
[branch-2.10][fix][broker] Fix inconsensus namespace policies by
getPoliciesIfCached (#20873)
---
.../broker/resources/NamespaceResources.java | 7 ++
.../broker/admin/impl/PersistentTopicsBase.java | 21 ++--
.../pulsar/broker/lookup/TopicLookupBase.java | 8 +-
.../pulsar/broker/service/AbstractTopic.java | 20 +++-
.../pulsar/broker/service/BrokerService.java | 64 ++++++++++-
.../apache/pulsar/broker/service/ServerCnx.java | 120 +++++++++++----------
.../service/nonpersistent/NonPersistentTopic.java | 6 +-
.../service/persistent/DispatchRateLimiter.java | 6 ++
.../broker/service/persistent/PersistentTopic.java | 11 +-
.../service/persistent/SubscribeRateLimiter.java | 14 ++-
.../broker/lookup/http/HttpTopicLookupv2Test.java | 3 +
11 files changed, 198 insertions(+), 82 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 74d4bcf6c47..90e3971c4cf 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -122,6 +122,13 @@ public class NamespaceResources extends
BaseResources<Policies> {
return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
+ /**
+ * Get the namespace policy from the metadata cache. This method will not
trigger the load of metadata cache.
+ *
+ * @deprecated Since this method may introduce inconsistent namespace
policies. we should use
+ * #{@link NamespaceResources#getPoliciesAsync}
+ */
+ @Deprecated
public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH,
ns.toString()));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index da37d1b7e54..f4e5caacded 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2208,9 +2208,10 @@ public class PersistentTopicsBase extends AdminResource {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative,
replicated, properties);
} else {
- boolean allowAutoTopicCreation =
pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
- getPartitionedTopicMetadataAsync(topicName,
- authoritative,
allowAutoTopicCreation).thenAccept(partitionMetadata -> {
+
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+ .thenCompose(allowAutoTopicCreation ->
+ getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation))
+ .thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new
CompletableFuture<>();
@@ -2306,13 +2307,13 @@ public class PersistentTopicsBase extends AdminResource
{
MessageIdImpl targetMessageId, boolean authoritative, boolean
replicated,
Map<String, String> properties) {
- boolean isAllowAutoTopicCreation =
pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
-
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> {
- validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE, subscriptionName);
- return
pulsar().getBrokerService().getTopic(topicName.toString(),
isAllowAutoTopicCreation);
- }).thenApply(optTopic -> {
+ pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+ .thenCompose(isAllowAutoTopicCreation ->
validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> {
+ validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE, subscriptionName);
+ return
pulsar().getBrokerService().getTopic(topicName.toString(),
isAllowAutoTopicCreation);
+ }))
+ .thenApply(optTopic -> {
if (optTopic.isPresent()) {
return optTopic.get();
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index ae1d2a5bab0..7baebc7d1b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -83,9 +83,11 @@ public class TopicLookupBase extends PulsarWebResource {
// Currently, it's hard to check the non-persistent-non-partitioned
topic, because it only exists in the broker,
// it doesn't have metadata. If the topic is non-persistent and
non-partitioned, we'll return the true flag.
- CompletableFuture<Boolean> existFuture =
pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)
- || (!topicName.isPersistent() && !topicName.isPartitioned())
- ? CompletableFuture.completedFuture(true) :
pulsar().getNamespaceService().checkTopicExists(topicName);
+ CompletableFuture<Boolean> existFuture =
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+ .thenCompose(isAllowAutoTopicCreation ->
+ isAllowAutoTopicCreation || (!topicName.isPersistent()
&& !topicName.isPartitioned())
+ ? CompletableFuture.completedFuture(true) :
+
pulsar().getNamespaceService().checkTopicExists(topicName));
existFuture.thenAccept(exist -> {
if (!exist) {
completeLookupResponseExceptionally(asyncResponse, new
RestException(Response.Status.NOT_FOUND,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 736fcf1f5e1..19abb22b017 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongFunction;
+import javax.annotation.Nonnull;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
@@ -57,6 +59,7 @@ import
org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
@@ -1065,6 +1068,12 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return brokerService.getBrokerPublishRateLimiter();
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and we can use
+ * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead
of it.
+ */
+ @Deprecated
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Policies policies;
try {
@@ -1078,17 +1087,20 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
policies = new Policies();
}
+ updateResourceGroupLimiter(policies);
+ }
+ public void updateResourceGroupLimiter(@Nonnull Policies
namespacePolicies) {
+ requireNonNull(namespacePolicies);
// attach the resource-group level rate limiters, if set
- String rgName = policies.resource_group_name;
+ String rgName = namespacePolicies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
-
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
+
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
if (resourceGroup != null) {
this.resourceGroupRateLimitingEnabled = true;
this.resourceGroupPublishLimiter =
resourceGroup.getResourceGroupPublishLimiter();
-
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
- () -> this.enableCnxAutoRead());
+
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
this::enableCnxAutoRead);
log.info("Using resource group {} rate limiter for topic {}",
rgName, topic);
return;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 50bfe6d3df5..28ef4d35ec8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -56,6 +57,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -72,6 +74,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import javax.annotation.Nonnull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -931,7 +934,15 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
- return getTopic(topic,
isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
+ final TopicName topicName;
+ try {
+ topicName = TopicName.get(topic);
+ } catch (Throwable ex) {
+ return FutureUtil.failedFuture(ex);
+ }
+ return isAllowAutoTopicCreationAsync(topicName)
+ .thenCompose(isAllowAutoTopicCreation -> getTopic(topic,
isAllowAutoTopicCreation)
+ .thenApply(Optional::get));
}
public CompletableFuture<Optional<Topic>> getTopic(final String topic,
boolean createIfMissing) {
@@ -2718,7 +2729,8 @@ public class BrokerService implements Closeable {
if (metadata.partitions == 0
&& !topicExists
&& !topicName.isPartitioned()
- &&
pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies)
+ && pulsar.getBrokerService()
+
.isAllowAutoTopicCreation(topicName, policies)
&& pulsar.getBrokerService()
.isDefaultTopicTypePartitioned(topicName, policies)) {
@@ -2944,11 +2956,23 @@ public class BrokerService implements Closeable {
cnxSet.forEach(consumer);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
+ * You can use #{@link
BrokerService#isAllowAutoTopicCreationAsync(TopicName)}
+ */
+ @Deprecated
public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
+ * You can use #{@link
BrokerService#isAllowAutoTopicCreationAsync(TopicName)}
+ */
+ @Deprecated
public boolean isAllowAutoTopicCreation(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
@@ -2956,6 +2980,11 @@ public class BrokerService implements Closeable {
return isAllowAutoTopicCreation(topicName, policies);
}
+ public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final
TopicName topicName) {
+ return
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
+ .thenApply(policies -> isAllowAutoTopicCreation(topicName,
policies));
+ }
+
public boolean isAllowAutoTopicCreation(final TopicName topicName, final
Optional<Policies> policies) {
if (policies.isPresent() && policies.get().deleted) {
log.info("Preventing AutoTopicCreation on a namespace that is
being deleted {}",
@@ -3002,11 +3031,23 @@ public class BrokerService implements Closeable {
return null;
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+ */
+ @Deprecated
public boolean isAllowAutoSubscriptionCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoSubscriptionCreation(topicName);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+ */
+ @Deprecated
public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
getAutoSubscriptionCreationOverride(topicName);
@@ -3017,6 +3058,12 @@ public class BrokerService implements Closeable {
}
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
+ */
+ @Deprecated
private AutoSubscriptionCreationOverride
getAutoSubscriptionCreationOverride(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
@@ -3028,6 +3075,19 @@ public class BrokerService implements Closeable {
return null;
}
+ public @Nonnull CompletionStage<Boolean>
isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
+ requireNonNull(tpName);
+ // namespace level policies
+ return
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
+ .thenApply(policies -> {
+ if (policies.isPresent() &&
policies.get().autoSubscriptionCreationOverride != null) {
+ return
policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
+ }
+ // broker level policies
+ return
pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
+ });
+ }
+
public boolean isSystemTopic(String topic) {
return isSystemTopic(TopicName.get(topic));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c9d8a2ff700..ef7cfdf14d8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -53,6 +53,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -1034,48 +1035,53 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return null;
}
- boolean createTopicIfDoesNotExist = forceTopicCreation
- &&
service.isAllowAutoTopicCreation(topicName.toString());
-
- service.getTopic(topicName.toString(),
createTopicIfDoesNotExist)
+ service.isAllowAutoTopicCreationAsync(topicName)
+ .thenCompose(isAllowAutoTopicCreation -> {
+ final boolean createTopicIfDoesNotExist =
forceTopicCreation && isAllowAutoTopicCreation;
+ return service.getTopic(topicName.toString(),
createTopicIfDoesNotExist);
+ })
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new
TopicNotFoundException(
"Topic " + topicName + " does
not exist"));
}
-
- Topic topic = optTopic.get();
-
- boolean rejectSubscriptionIfDoesNotExist =
isDurable
- &&
!service.isAllowAutoSubscriptionCreation(topicName.toString())
- &&
!topic.getSubscriptions().containsKey(subscriptionName)
- && topic.isPersistent();
-
- if (rejectSubscriptionIfDoesNotExist) {
- return FutureUtil
- .failedFuture(
- new
SubscriptionNotFoundException(
- "Subscription does not
exist"));
- }
-
- SubscriptionOption option =
SubscriptionOption.builder().cnx(ServerCnx.this)
- .subscriptionName(subscriptionName)
-
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
-
.consumerName(consumerName).isDurable(isDurable)
-
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
- .initialPosition(initialPosition)
-
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
-
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-
.subscriptionProperties(subscriptionProperties)
- .consumerEpoch(consumerEpoch)
- .build();
- if (schema != null) {
- return
topic.addSchemaIfIdleOrCheckCompatible(schema)
- .thenCompose(v ->
topic.subscribe(option));
- } else {
- return topic.subscribe(option);
- }
+ final Topic topic = optTopic.get();
+ return
service.isAllowAutoSubscriptionCreationAsync(topicName)
+
.thenCompose(isAllowAutoSubscriptionCreation -> {
+ boolean
rejectSubscriptionIfDoesNotExist = isDurable
+ &&
!isAllowAutoSubscriptionCreation
+ &&
!topic.getSubscriptions().containsKey(subscriptionName)
+ && topic.isPersistent();
+
+ if (rejectSubscriptionIfDoesNotExist) {
+ return FutureUtil
+ .failedFuture(
+ new
SubscriptionNotFoundException(
+
"Subscription does not exist"));
+ }
+
+ SubscriptionOption option =
SubscriptionOption.builder().cnx(ServerCnx.this)
+
.subscriptionName(subscriptionName)
+
.consumerId(consumerId).subType(subType)
+ .priorityLevel(priorityLevel)
+
.consumerName(consumerName).isDurable(isDurable)
+
.startMessageId(startMessageId).metadata(metadata)
+ .readCompacted(readCompacted)
+
.initialPosition(initialPosition)
+
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
+
.replicatedSubscriptionStateArg(isReplicated)
+ .keySharedMeta(keySharedMeta)
+
.subscriptionProperties(subscriptionProperties)
+ .consumerEpoch(consumerEpoch)
+ .build();
+ if (schema != null) {
+ return
topic.addSchemaIfIdleOrCheckCompatible(schema)
+ .thenCompose(v ->
topic.subscribe(option));
+ } else {
+ return topic.subscribe(option);
+ }
+ });
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
@@ -1286,33 +1292,39 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
schemaVersionFuture.thenAccept(schemaVersion -> {
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future
-> {
- CompletableFuture<Subscription>
createInitSubFuture;
+ CompletionStage<Subscription> createInitSubFuture;
if (!Strings.isNullOrEmpty(initialSubscriptionName)
&& topic.isPersistent()
&&
!topic.getSubscriptions().containsKey(initialSubscriptionName)) {
- if
(!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
- String msg =
- "Could not create the initial
subscription due to the auto subscription "
- + "creation is not
allowed.";
- if (producerFuture.completeExceptionally(
- new
BrokerServiceException.NotAllowedException(msg))) {
- log.warn("[{}] {}
initialSubscriptionName: {}, topic: {}",
- remoteAddress, msg,
initialSubscriptionName, topicName);
-
commandSender.sendErrorResponse(requestId,
- ServerError.NotAllowedError,
msg);
- }
- producers.remove(producerId,
producerFuture);
- return;
- }
- createInitSubFuture =
-
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
- false, null);
+ createInitSubFuture =
service.isAllowAutoSubscriptionCreationAsync(topicName)
+
.thenCompose(isAllowAutoSubscriptionCreation -> {
+ if (!isAllowAutoSubscriptionCreation) {
+ return FutureUtil.failedFuture(
+ new
BrokerServiceException.NotAllowedException(
+ "Could not create
the initial subscription due to"
+ + " the auto
subscription creation is not allowed."));
+ }
+ return
topic.createSubscription(initialSubscriptionName,
+ InitialPosition.Earliest,
false, null);
+ });
} else {
createInitSubFuture =
CompletableFuture.completedFuture(null);
}
createInitSubFuture.whenComplete((sub, ex) -> {
if (ex != null) {
+ final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
+ if (rc instanceof
BrokerServiceException.NotAllowedException) {
+ log.warn("[{}] {}
initialSubscriptionName: {}, topic: {}",
+ remoteAddress,
rc.getMessage(), initialSubscriptionName, topicName);
+ if
(producerFuture.completeExceptionally(rc)) {
+
commandSender.sendErrorResponse(requestId,
+
ServerError.NotAllowedError, rc.getMessage());
+ }
+ producers.remove(producerId,
producerFuture);
+ return;
+ }
+
String msg =
"Failed to create the initial
subscription: " + ex.getCause().getMessage();
log.warn("[{}] {} initialSubscriptionName:
{}, topic: {}",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 54f738101be..a021e7ce94d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -157,18 +157,20 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
return
brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAccept(optPolicies -> {
+ final Policies policies;
if (!optPolicies.isPresent()) {
log.warn("[{}] Policies not present and
isEncryptionRequired will be set to false", topic);
isEncryptionRequired = false;
+ policies = new Policies();
} else {
- Policies policies = optPolicies.get();
+ policies = optPolicies.get();
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema =
policies.is_allow_auto_update_schema;
schemaValidationEnforced =
policies.schema_validation_enforced;
}
updatePublishDispatcher();
- updateResourceGroupLimiter(optPolicies);
+ updateResourceGroupLimiter(policies);
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index cd7f5e9ea64..568ae0915cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -364,6 +364,12 @@ public class DispatchRateLimiter {
return
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call. we can use #{@link
DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it.
+ */
+ @Deprecated
public static Optional<Policies> getPolicies(BrokerService brokerService,
String topicName) {
final NamespaceName namespace =
TopicName.get(topicName).getNamespaceObject();
return
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5054b039af7..ec174a41d63 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static
org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
import static
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
@@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import lombok.Getter;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -316,7 +318,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
- updateResourceGroupLimiter(optPolicies);
+ updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
return;
@@ -332,7 +334,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
updatePublishDispatcher();
- updateResourceGroupLimiter(optPolicies);
+ updateResourceGroupLimiter(policies);
this.isEncryptionRequired = policies.encryption_required;
@@ -2419,7 +2421,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
@Override
- public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
+ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
+ requireNonNull(data);
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic,
isEncryptionRequired,
data.encryption_required);
@@ -2443,7 +2446,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
updatePublishDispatcher();
- this.updateResourceGroupLimiter(Optional.of(data));
+ updateResourceGroupLimiter(data);
List<CompletableFuture<Void>> producerCheckFutures = new
ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 89af6f6be88..f602659400f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RateLimiter;
@@ -159,14 +160,21 @@ public class SubscribeRateLimiter {
}
/**
- * Gets configured subscribe-rate from namespace policies. Returns null if
subscribe-rate is not configured
- *
- * @return
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call.
*/
+ @Deprecated
public SubscribeRate getPoliciesSubscribeRate() {
return getPoliciesSubscribeRate(brokerService, topicName);
}
+ /**
+ * @deprecated Avoid using the deprecated method
+ * #{@link
org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
and blocking
+ * call.
+ */
+ @Deprecated
public static SubscribeRate getPoliciesSubscribeRate(BrokerService
brokerService, final String topicName) {
final String cluster =
brokerService.pulsar().getConfiguration().getClusterName();
final Optional<Policies> policies =
DispatchRateLimiter.getPolicies(brokerService, topicName);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 5ae60762748..5906c6f4fe2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -142,6 +142,9 @@ public class HttpTopicLookupv2Test {
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(config).isAuthorizationEnabled();
+ BrokerService brokerService = pulsar.getBrokerService();
+ doReturn(CompletableFuture.completedFuture(false))
+ .when(brokerService).isAllowAutoTopicCreationAsync(any());
NamespaceService namespaceService = pulsar.getNamespaceService();
CompletableFuture<Boolean> future = new CompletableFuture<>();
future.complete(false);