This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch duplicated_delete in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e75a303ea5e07d259e28e7661d917738e626331f Author: xiangying <[email protected]> AuthorDate: Thu Jun 29 20:14:56 2023 +0800 [fix][branch-2.10] Fix duplicated deleting topics --- .../pulsar/broker/admin/impl/NamespacesBase.java | 22 +++++++++++++++------- .../apache/pulsar/broker/admin/AdminApi2Test.java | 12 +++++++----- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ae7f36ad191..f2d1611786f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -339,6 +339,8 @@ public abstract class NamespacesBase extends AdminResource { return; } } + noPartitionSystemTopic.removeAll(partitionSystemTopic); + noPartitionedTopicPolicySystemTopic.removeAll(partitionedTopicPolicySystemTopic); deleteSystemTopicFuture = internalDeleteTopicsAsync(noPartitionSystemTopic) .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionSystemTopic)) .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) @@ -348,8 +350,6 @@ public abstract class NamespacesBase extends AdminResource { } deleteSystemTopicFuture - .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) - .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic)) .thenCompose(__ -> { List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList(); NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() @@ -523,7 +523,12 @@ public abstract class NamespacesBase extends AdminResource { } String partitionedTopic = topicName.getPartitionedTopicName(); if (!partitionedTopics.contains(partitionedTopic)) { - partitionedTopics.add(partitionedTopic); + if (!partitionedTopics.contains(partitionedTopic) && + !nonPartitionedTopics.contains(partitionedTopic)) { + partitionedTopics.add(partitionedTopic); + } else { + continue; + } } } else { if (pulsar().getBrokerService().isSystemTopic(topicName)) { @@ -534,7 +539,12 @@ public abstract class NamespacesBase extends AdminResource { } continue; } + if (!partitionedTopics.contains(topic) && + !nonPartitionedTopics.contains(topic)) { nonPartitionedTopics.add(topic); + } else { + continue; + } } topicFutures.add(pulsar().getAdminClient().topics().deleteAsync( topic, true, true)); @@ -550,10 +560,6 @@ public abstract class NamespacesBase extends AdminResource { } } - for (String partitionedTopic : partitionedTopics) { - topicFutures.add(namespaceResources().getPartitionedTopicResources() - .deletePartitionedTopicAsync(TopicName.get(partitionedTopic))); - } if (log.isDebugEnabled()) { log.debug("Successfully send deletion command of partitioned-topics:{} " @@ -561,6 +567,8 @@ public abstract class NamespacesBase extends AdminResource { partitionedTopics, nonPartitionedTopics, namespaceName); } + allPartitionedSystemTopics.removeAll(allSystemTopics); + partitionedTopicPolicySystemTopic.removeAll(noPartitionedTopicPolicySystemTopic); final CompletableFuture<Throwable> topicFutureEx = FutureUtil.waitForAll(topicFutures) .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index a928101c460..247c95f8418 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1419,7 +1419,8 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { return new Object[][]{ {new NamespaceAttr(false, "non-partitioned", 0, false)}, {new NamespaceAttr(true, "non-partitioned", 0, false)}, - {new NamespaceAttr(true, "partitioned", 3, false)} + {new NamespaceAttr(true, "partitioned", 3, false)}, + {new NamespaceAttr(true, "partitioned", 3, true)} }; } @@ -1472,11 +1473,12 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { // Expected: cannot delete non-empty tenant } - // delete topic - admin.topics().deletePartitionedTopic(topic); - + if (!conf.isForceDeleteNamespaceAllowed()) { + // delete topic + admin.topics().deletePartitionedTopic(topic); + } // delete namespace - admin.namespaces().deleteNamespace(namespace, false); + admin.namespaces().deleteNamespace(namespace, conf.isForceDeleteNamespaceAllowed()); assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace)); assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
