This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new f10797abcdb [fix][branch-2.10] Fix duplicated deleting topics (#20685)
f10797abcdb is described below
commit f10797abcdbd9e9896e221db84990a9aa0971284
Author: Xiangying Meng <[email protected]>
AuthorDate: Sun Jul 2 11:10:43 2023 +0800
[fix][branch-2.10] Fix duplicated deleting topics (#20685)
### Motivation
1. The topics have been deleted twice.
https://github.com/apache/pulsar/blob/90369a0da639d43c841974d08ab77faeb7ba5cdd/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L342-L355
2. Similar to https://github.com/apache/pulsar/pull/20683
We need to do Deduplication.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 22 +++++++++++++++-------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 12 +++++++-----
2 files changed, 22 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ae7f36ad191..f325fd26a94 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -339,6 +339,8 @@ public abstract class NamespacesBase extends AdminResource {
return;
}
}
+ noPartitionSystemTopic.removeAll(partitionSystemTopic);
+
noPartitionedTopicPolicySystemTopic.removeAll(partitionedTopicPolicySystemTopic);
deleteSystemTopicFuture =
internalDeleteTopicsAsync(noPartitionSystemTopic)
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionSystemTopic))
.thenCompose(ignore ->
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
@@ -348,8 +350,6 @@ public abstract class NamespacesBase extends AdminResource {
}
deleteSystemTopicFuture
- .thenCompose(ignore ->
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
- .thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
.thenCompose(__ -> {
List<CompletableFuture<Void>> deleteBundleFutures =
Lists.newArrayList();
NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
@@ -523,7 +523,12 @@ public abstract class NamespacesBase extends AdminResource
{
}
String partitionedTopic =
topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic))
{
- partitionedTopics.add(partitionedTopic);
+ if
(!partitionedTopics.contains(partitionedTopic)
+ &&
!nonPartitionedTopics.contains(partitionedTopic)) {
+ partitionedTopics.add(partitionedTopic);
+ } else {
+ continue;
+ }
}
} else {
if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
@@ -534,7 +539,12 @@ public abstract class NamespacesBase extends AdminResource
{
}
continue;
}
+ if (!partitionedTopics.contains(topic)
+ && !nonPartitionedTopics.contains(topic)) {
nonPartitionedTopics.add(topic);
+ } else {
+ continue;
+ }
}
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
@@ -550,10 +560,6 @@ public abstract class NamespacesBase extends AdminResource
{
}
}
- for (String partitionedTopic : partitionedTopics) {
-
topicFutures.add(namespaceResources().getPartitionedTopicResources()
-
.deletePartitionedTopicAsync(TopicName.get(partitionedTopic)));
- }
if (log.isDebugEnabled()) {
log.debug("Successfully send deletion command of
partitioned-topics:{} "
@@ -561,6 +567,8 @@ public abstract class NamespacesBase extends AdminResource {
partitionedTopics, nonPartitionedTopics,
namespaceName);
}
+ allPartitionedSystemTopics.removeAll(allSystemTopics);
+
partitionedTopicPolicySystemTopic.removeAll(noPartitionedTopicPolicySystemTopic);
final CompletableFuture<Throwable> topicFutureEx =
FutureUtil.waitForAll(topicFutures)
.thenCompose((ignore) ->
internalDeleteTopicsAsync(allSystemTopics))
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a928101c460..247c95f8418 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1419,7 +1419,8 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
return new Object[][]{
{new NamespaceAttr(false, "non-partitioned", 0, false)},
{new NamespaceAttr(true, "non-partitioned", 0, false)},
- {new NamespaceAttr(true, "partitioned", 3, false)}
+ {new NamespaceAttr(true, "partitioned", 3, false)},
+ {new NamespaceAttr(true, "partitioned", 3, true)}
};
}
@@ -1472,11 +1473,12 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
// Expected: cannot delete non-empty tenant
}
- // delete topic
- admin.topics().deletePartitionedTopic(topic);
-
+ if (!conf.isForceDeleteNamespaceAllowed()) {
+ // delete topic
+ admin.topics().deletePartitionedTopic(topic);
+ }
// delete namespace
- admin.namespaces().deleteNamespace(namespace, false);
+ admin.namespaces().deleteNamespace(namespace,
conf.isForceDeleteNamespaceAllowed());
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());