This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e28f69e  [Enhancement] avoid duplicate deletion of schema (#11640)
e28f69e is described below

commit e28f69ecf82d40b20aed16ad03f7831bbca721b4
Author: Zhanpeng Wu <[email protected]>
AuthorDate: Sat Aug 21 19:32:38 2021 +0800

    [Enhancement] avoid duplicate deletion of schema (#11640)
    
    ### Motivation
    
    Currently when I need to delete a namespace forcedly, the 
`NamespacesBase#internalDeleteNamespaceForcefully` will firstly list all the 
topics under the namespace and then delete them concurrently without 
distinguishing wheather the topic is partitioned or non-partitioned. This 
behaivor will cause duplicate deletion of schema when the topic list contains 
partitioned topics, because all the partitions of the partitioned topic share 
the same schema.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 34 +++++++++++--
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 56 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4048b4a..eb27521 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -415,14 +415,42 @@ public abstract class NamespacesBase extends 
AdminResource {
         try {
             // firstly remove all topics including system topics
             if (!topics.isEmpty()) {
+                Set<String> partitionedTopics = new HashSet<>();
+                Set<String> nonPartitionedTopics = new HashSet<>();
+
                 for (String topic : topics) {
                     try {
-                        
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                        TopicName topicName = TopicName.get(topic);
+                        if (topicName.isPartitioned()) {
+                            String partitionedTopic = 
topicName.getPartitionedTopicName();
+                            if (!partitionedTopics.contains(partitionedTopic)) 
{
+                                // Distinguish partitioned topic to avoid 
duplicate deletion of the same schema
+                                
futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
+                                        partitionedTopic, true, true));
+                                partitionedTopics.add(partitionedTopic);
+                            }
+                        } else {
+                            
futures.add(pulsar().getAdminClient().topics().deleteAsync(
+                                    topic, true, true));
+                            nonPartitionedTopics.add(topic);
+                        }
                     } catch (Exception e) {
-                        log.error("[{}] Failed to force delete topic {}", 
clientAppId(), topic, e);
-                        asyncResponse.resume(new RestException(e));
+                        String errorMessage = String.format("Failed to force 
delete topic %s, "
+                                        + "but the previous deletion command 
of partitioned-topics:%s "
+                                        + "and non-partitioned-topics:%s have 
been sent out asynchronously. "
+                                        + "Reason: %s",
+                                topic, partitionedTopics, 
nonPartitionedTopics, e.getCause());
+                        log.error("[{}] {}", clientAppId(), errorMessage, e);
+                        asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, errorMessage));
+                        return;
                     }
                 }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Successfully send deletion command of 
partitioned-topics:{} "
+                                    + "and non-partitioned-topics:{} in 
namespace:{}.",
+                            partitionedTopics, nonPartitionedTopics, 
namespaceName);
+                }
             }
             // forcefully delete namespace bundles
             NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index ce6814c..556305d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Field;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -44,6 +45,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.core.Response.Status;
 import lombok.Cleanup;
@@ -1473,6 +1475,60 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws 
Exception {
+        conf.setForceDeleteNamespaceAllowed(true);
+        final String ns = "prop-xyz/distinguish-topic-type-ns";
+        final String exNs = "prop-xyz/ex-distinguish-topic-type-ns";
+        admin.namespaces().createNamespace(ns, 2);
+        admin.namespaces().createNamespace(exNs, 2);
+
+        final String p1 = "persistent://" + ns + "/p1";
+        final String p5 = "persistent://" + ns + "/p5";
+        final String np = "persistent://" + ns + "/np";
+
+        admin.topics().createPartitionedTopic(p1, 1);
+        admin.topics().createPartitionedTopic(p5, 5);
+        admin.topics().createNonPartitionedTopic(np);
+
+        final String exNp = "persistent://" + exNs + "/np";
+        admin.topics().createNonPartitionedTopic(exNp);
+        // insert an invalid topic name
+        pulsar.getLocalMetadataStore().put(
+                "/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), 
Optional.empty()).join();
+
+        List<String> topics = 
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get();
+        List<String> exTopics = 
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get();
+
+        // ensure that the topic list contains all the topics
+        List<String> allTopics = new ArrayList<>(Arrays.asList(np, 
TopicName.get(p1).getPartition(0).toString()));
+        for (int i = 0; i < 5; i++) {
+            allTopics.add(TopicName.get(p5).getPartition(i).toString());
+        }
+        Assert.assertEquals(allTopics.stream().filter(t -> 
!topics.contains(t)).count(), 0);
+        Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/"));
+        // partition num = p1 + p5 + np
+        Assert.assertEquals(topics.size(), 1 + 5 + 1);
+        Assert.assertEquals(exTopics.size(), 1 + 1);
+
+        admin.namespaces().deleteNamespace(ns, true);
+        Arrays.asList(p1, p5, np).forEach(t -> {
+            try {
+                admin.schemas().getSchemaInfo(t);
+            } catch (PulsarAdminException e) {
+                // all the normal topics' schemas have been deleted
+                Assert.assertEquals(e.getStatusCode(), 404);
+            }
+        });
+
+        try {
+            admin.namespaces().deleteNamespace(exNs, true);
+            fail("Should fail due to invalid topic");
+        } catch (Exception e) {
+            //ok
+        }
+    }
+
+    @Test
     public void testUpdateClusterWithProxyUrl() throws Exception {
         ClusterData cluster = 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
         String clusterName = "test2";

Reply via email to