This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch delete_ns
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dd4ce57f3e06584e647c24445346cb77c221a117
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Dec 9 14:56:37 2022 +0800

    [fix][broker] Fix delete system topic clean topic policy (#18823)
    
    If users set topic policy for system topic, then delete this system topic, 
the topic policy should be deleted.
    
    Only change_events topic do not need to clear topic policies.
    
    (cherry picked from commit 93c41de8aac7dd655491d3b231468753d2d0a113)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 55 ++++++++++++++++------
 .../broker/service/persistent/PersistentTopic.java |  3 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 37 ++++++++++++++-
 3 files changed, 78 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c2ce36d49ff..d8777f97665 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -310,10 +311,21 @@ public abstract class NamespacesBase extends 
AdminResource {
         // remove from owned namespace map and ephemeral node from ZK
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         // remove system topics first.
+        Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+        Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
         if (!topics.isEmpty()) {
             for (String topic : topics) {
                 try {
-                    
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                        TopicName topicName = TopicName.get(topic);
+                        if (topicName.isPartitioned()) {
+                            partitionedTopicPolicySystemTopic.add(topic);
+                        } else {
+                            noPartitionedTopicPolicySystemTopic.add(topic);
+                        }
+                    } else {
+                        
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    }
                 } catch (Exception ex) {
                     log.error("[{}] Failed to delete system topic {}", 
clientAppId(), topic, ex);
                     asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, ex));
@@ -321,11 +333,14 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }
             }
         }
-        FutureUtil.waitForAll(futures).thenCompose(__ -> {
-            List<CompletableFuture<Void>> deleteBundleFutures = 
Lists.newArrayList();
-            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                            .getBundles(namespaceName);
-            for (NamespaceBundle bundle : bundles.getBundles()) {
+        FutureUtil.waitForAll(futures)
+                .thenCompose(ignore -> 
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+                .thenCompose(ignore -> 
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                .thenCompose(__ -> {
+                    List<CompletableFuture<Void>> deleteBundleFutures = 
Lists.newArrayList();
+                    NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                    .getBundles(namespaceName);
+                    for (NamespaceBundle bundle : bundles.getBundles()) {
                         // check if the bundle is owned by any broker, if not 
then we do not need to delete the bundle
                 
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership
 -> {
                     if (ownership.isPresent()) {
@@ -475,27 +490,41 @@ public abstract class NamespacesBase extends 
AdminResource {
                 Set<String> nonPartitionedTopics = new HashSet<>();
                 Set<String> allSystemTopics = new HashSet<>();
                 Set<String> allPartitionedSystemTopics = new HashSet<>();
+                Set<String> noPartitionedTopicPolicySystemTopic = new 
HashSet<>();
+                Set<String> partitionedTopicPolicySystemTopic = new 
HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
                             if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    
partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+                                } else {
+                                    
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                }
                                 continue;
                             }
                             String partitionedTopic = 
topicName.getPartitionedTopicName();
                             if (!partitionedTopics.contains(partitionedTopic)) 
{
+                                // Distinguish partitioned topic to avoid 
duplicate deletion of the same schema
+                                
topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
+                                        partitionedTopic, true, true));
                                 partitionedTopics.add(partitionedTopic);
                             }
                         } else {
                             if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                allSystemTopics.add(topic);
+                                if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    
noPartitionedTopicPolicySystemTopic.add(topic);
+                                } else {
+                                    allSystemTopics.add(topic);
+                                }
                                 continue;
                             }
+                            
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
+                                    topic, true, true));
                             nonPartitionedTopics.add(topic);
                         }
-                        
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
                     } catch (Exception e) {
                         String errorMessage = String.format("Failed to force 
delete topic %s, "
                                         + "but the previous deletion command 
of partitioned-topics:%s "
@@ -508,11 +537,6 @@ public abstract class NamespacesBase extends AdminResource 
{
                     }
                 }
 
-                for (String partitionedTopic : partitionedTopics) {
-                    
topicFutures.add(namespaceResources().getPartitionedTopicResources()
-                            
.deletePartitionedTopicAsync(TopicName.get(partitionedTopic)));
-                }
-
                 if (log.isDebugEnabled()) {
                     log.debug("Successfully send deletion command of 
partitioned-topics:{} "
                                     + "and non-partitioned-topics:{} in 
namespace:{}.",
@@ -524,6 +548,9 @@ public abstract class NamespacesBase extends AdminResource {
                                 .thenCompose((ignore) -> 
internalDeleteTopicsAsync(allSystemTopics))
                                 .thenCompose((ignore) ->
                                         
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .thenCompose(ignore ->
+                                        
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                                .thenCompose(ignore -> 
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
                                 .handle((result, exception) -> {
                                     if (exception != null) {
                                         if (exception.getCause() instanceof 
PulsarAdminException) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8e95f2c4313..c4e8ac73be2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1185,8 +1185,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
                     deleteTopicAuthenticationFuture.thenCompose(ignore -> 
deleteSchema())
                                 .thenCompose(ignore -> {
-                                    if 
(!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
-                                            && 
brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
+                                    if 
(!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
                                         return deleteTopicPolicies();
                                     } else {
                                         return 
CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 16dfe5bc9a3..957088e4717 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -57,11 +58,13 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.v1.Namespaces;
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -81,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -103,6 +107,8 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -1938,7 +1944,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws 
Exception {
+    public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws 
Exception {
         String namespace = this.testTenant + "/delete-systemTopic";
         String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-systemTopic",
                 "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1958,4 +1964,33 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         // 4. delete the policies topic and the topic wil not to clear topic 
polices
         admin.topics().delete(namespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
     }
+    @Test
+    public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        Field field = 
PulsarService.class.getDeclaredField("topicPoliciesService");
+        field.setAccessible(true);
+        field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+        String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + 
"testDeleteTopicPolicyWhenDeleteSystemTopic";
+        admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", 
"usc", "usw")));
+
+        admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(systemTopic).create();
+        admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+        Integer maxConsumerPerTopic = pulsar
+                .getTopicPoliciesService()
+                
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+                .getMaxConsumerPerTopic();
+
+        assertEquals(maxConsumerPerTopic, 5);
+        admin.topics().delete(systemTopic, true);
+        TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+                
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, 
TimeUnit.SECONDS);
+        assertNull(topicPolicies);
+    }
 }

Reply via email to