This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 643787e931d [fix][broker] Make `deleteTopicPolicies` serialized is
executed when close topic. (#15811)
643787e931d is described below
commit 643787e931db264b483ae7da7f2ba7c4f8e882a8
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Aug 22 16:44:31 2022 +0800
[fix][broker] Make `deleteTopicPolicies` serialized is executed when close
topic. (#15811)
(cherry picked from commit e8ee996dd0c7a3a742117aee399b31e89e6e2d9d)
---
.../org/apache/pulsar/broker/service/BrokerService.java | 15 ++++++++++++---
.../client/api/AuthenticatedProducerConsumerTest.java | 1 +
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a3df7ed3862..b251b0d843b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2864,11 +2864,20 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
- if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ final PulsarService pulsarService = pulsar();
+ if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
return CompletableFuture.completedFuture(null);
}
- TopicName cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- return
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+ return pulsarService.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenComposeAsync(optPolicies -> {
+ if (optPolicies.isPresent() && optPolicies.get().deleted) {
+ // We can return the completed future directly if the
namespace is already deleted.
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
+ return
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+ });
}
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName
topicName, int numPartitions) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 6d9135af1ac..9be5076b374 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -80,6 +80,7 @@ public class AuthenticatedProducerConsumerTest extends
ProducerConsumerBase {
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
+ conf.setTopicLevelPoliciesEnabled(false);
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("localhost");