This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 5eb0ce2ac5c [fix][broker] Fix delete system topic clean topic policy 
(#18823) (#19835)
5eb0ce2ac5c is described below

commit 5eb0ce2ac5c40ee13affebe9fc738e6b91ccc96e
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Mar 20 23:37:23 2023 +0800

    [fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)
    
    Co-authored-by: Jiwe Guo <[email protected]>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 47 +++++++++++++++++-----
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 36 ++++++++++++++++-
 3 files changed, 74 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c2ce36d49ff..f76e8a02827 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -310,10 +311,21 @@ public abstract class NamespacesBase extends 
AdminResource {
         // remove from owned namespace map and ephemeral node from ZK
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         // remove system topics first.
+        Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+        Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
         if (!topics.isEmpty()) {
             for (String topic : topics) {
                 try {
-                    
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                        TopicName topicName = TopicName.get(topic);
+                        if (topicName.isPartitioned()) {
+                            
partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+                        } else {
+                            noPartitionedTopicPolicySystemTopic.add(topic);
+                        }
+                    } else {
+                        
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    }
                 } catch (Exception ex) {
                     log.error("[{}] Failed to delete system topic {}", 
clientAppId(), topic, ex);
                     asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, ex));
@@ -321,11 +333,14 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }
             }
         }
-        FutureUtil.waitForAll(futures).thenCompose(__ -> {
-            List<CompletableFuture<Void>> deleteBundleFutures = 
Lists.newArrayList();
-            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                            .getBundles(namespaceName);
-            for (NamespaceBundle bundle : bundles.getBundles()) {
+        FutureUtil.waitForAll(futures)
+                .thenCompose(ignore -> 
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+                .thenCompose(ignore -> 
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                .thenCompose(__ -> {
+                    List<CompletableFuture<Void>> deleteBundleFutures = 
Lists.newArrayList();
+                    NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                    .getBundles(namespaceName);
+                    for (NamespaceBundle bundle : bundles.getBundles()) {
                         // check if the bundle is owned by any broker, if not 
then we do not need to delete the bundle
                 
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership
 -> {
                     if (ownership.isPresent()) {
@@ -475,13 +490,19 @@ public abstract class NamespacesBase extends 
AdminResource {
                 Set<String> nonPartitionedTopics = new HashSet<>();
                 Set<String> allSystemTopics = new HashSet<>();
                 Set<String> allPartitionedSystemTopics = new HashSet<>();
+                Set<String> noPartitionedTopicPolicySystemTopic = new 
HashSet<>();
+                Set<String> partitionedTopicPolicySystemTopic = new 
HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
                             if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    
partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+                                } else {
+                                    
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                }
                                 continue;
                             }
                             String partitionedTopic = 
topicName.getPartitionedTopicName();
@@ -490,12 +511,17 @@ public abstract class NamespacesBase extends 
AdminResource {
                             }
                         } else {
                             if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                allSystemTopics.add(topic);
+                                if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    
noPartitionedTopicPolicySystemTopic.add(topic);
+                                } else {
+                                    allSystemTopics.add(topic);
+                                }
                                 continue;
                             }
                             nonPartitionedTopics.add(topic);
                         }
-                        
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
+                        
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
+                                topic, true, true));
                     } catch (Exception e) {
                         String errorMessage = String.format("Failed to force 
delete topic %s, "
                                         + "but the previous deletion command 
of partitioned-topics:%s "
@@ -524,6 +550,9 @@ public abstract class NamespacesBase extends AdminResource {
                                 .thenCompose((ignore) -> 
internalDeleteTopicsAsync(allSystemTopics))
                                 .thenCompose((ignore) ->
                                         
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .thenCompose(ignore ->
+                                        
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                                .thenCompose(ignore -> 
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
                                 .handle((result, exception) -> {
                                     if (exception != null) {
                                         if (exception.getCause() instanceof 
PulsarAdminException) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8e95f2c4313..d8f56d72960 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1186,7 +1186,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     deleteTopicAuthenticationFuture.thenCompose(ignore -> 
deleteSchema())
                                 .thenCompose(ignore -> {
                                     if 
(!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
-                                            && 
brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
+                                        && 
brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
                                         return deleteTopicPolicies();
                                     } else {
                                         return 
CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 16dfe5bc9a3..eaa54c1fd34 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -57,11 +58,13 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.v1.Namespaces;
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -81,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -103,6 +107,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -1938,7 +1943,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws 
Exception {
+    public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws 
Exception {
         String namespace = this.testTenant + "/delete-systemTopic";
         String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-systemTopic",
                 "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1958,4 +1963,33 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         // 4. delete the policies topic and the topic wil not to clear topic 
polices
         admin.topics().delete(namespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
     }
+    @Test
+    public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        Field field = 
PulsarService.class.getDeclaredField("topicPoliciesService");
+        field.setAccessible(true);
+        field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+        String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + 
"testDeleteTopicPolicyWhenDeleteSystemTopic";
+        admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", 
"usc", "usw")));
+
+        admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(systemTopic).create();
+        admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+        Integer maxConsumerPerTopic = pulsar
+                .getTopicPoliciesService()
+                
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+                .getMaxConsumerPerTopic();
+
+        assertEquals(maxConsumerPerTopic, Integer.valueOf(5));
+        admin.topics().delete(systemTopic, true);
+        TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+                
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, 
TimeUnit.SECONDS);
+        assertNull(topicPolicies);
+    }
 }

Reply via email to